Copilot commented on code in PR #1812:
URL: https://github.com/apache/fluss/pull/1812#discussion_r2431665171
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java:
##########
@@ -20,37 +20,110 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.paimon.flink.FlinkTableFactory;
+
+import java.util.Map;
/** A factory to create {@link DynamicTableSource} for lake table. */
public class LakeTableFactory {
+ private final LakeCatalog lakeCatalog;
Review Comment:
The constructor change from no-args to requiring `LakeCatalog` is a breaking
change. Consider adding a deprecated no-args constructor for backward
compatibility or ensure all existing usages are updated.
```suggestion
/**
* Deprecated no-argument constructor for backward compatibility.
* lakeCatalog will be null. Do not use in new code.
*/
@Deprecated
public LakeTableFactory() {
this.lakeCatalog = null;
}
```
##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java:
##########
@@ -273,6 +276,76 @@ void testUnionReadInStreamMode(Boolean isPartitioned)
throws Exception {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws
Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "lake_pk_table_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ // create table & write initial data
+ long tableId =
+ preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
+
+ // wait until records have been synced to Iceberg
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ // Test 1: Read Iceberg lake table directly using $lake suffix
+ TableResult lakeTableResult =
+ batchTEnv.executeSql(String.format("select * from %s$lake",
tableName));
+ List<Row> icebergRows =
CollectionUtil.iteratorToList(lakeTableResult.collect());
+
+ // Verify that we can read data from Iceberg via $lake suffix
+ assertThat(icebergRows).isNotEmpty();
+
+ // Note: The expected row count should be based on how many rows were
written
+ // In preparePKTableFullType, we write 2 unique rows (by PK) per
iteration, 2 iterations
+ // Since this is a primary key table, duplicate PKs are deduplicated,
so only 2 unique rows
+ // per partition
+ int expectedUserRowCount = isPartitioned ? 2 *
waitUntilPartitions(t1).size() : 2;
+ assertThat(icebergRows).hasSize(expectedUserRowCount);
+
+ // verify rows have expected number of columns (user columns +
potential Iceberg metadata)
+ int userColumnCount = 16; // The table has 16 columns
+ Row firstRow = icebergRows.get(0);
+ assertThat(firstRow.getArity())
+ .as("Iceberg row should have at least user columns")
+ .isGreaterThanOrEqualTo(userColumnCount);
+
+ // Test 2: Read Iceberg system table (snapshots) using $lake$snapshots
suffix
+ TableResult snapshotsResult =
+ batchTEnv.executeSql(String.format("select * from
%s$lake$snapshots", tableName));
+ List<Row> snapshotRows =
CollectionUtil.iteratorToList(snapshotsResult.collect());
+
+ // Verify that we can read snapshots from Iceberg via $lake$snapshots
suffix
+ assertThat(snapshotRows).as("Should have at least one
snapshot").isNotEmpty();
+
+ // Verify snapshot structure based on Iceberg snapshots table schema
+ // Expected columns: committed_at, snapshot_id, parent_id, operation,
manifest_list, summary
+ Row firstSnapshot = snapshotRows.get(0);
+ assertThat(firstSnapshot.getArity()).as("Snapshot row should have 6
columns").isEqualTo(6);
Review Comment:
Magic number 6 should be extracted as a named constant representing the
expected Iceberg snapshots table column count for better maintainability.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -68,17 +69,29 @@
/** Factory to create table source and table sink for Fluss. */
public class FlinkTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
+ private final LakeCatalog lakeCatalog;
private volatile LakeTableFactory lakeTableFactory;
+ public FlinkTableFactory(LakeCatalog lakeCatalog) {
+ this.lakeCatalog = lakeCatalog;
+ }
+
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// check whether should read from datalake
ObjectIdentifier tableIdentifier = context.getObjectIdentifier();
String tableName = tableIdentifier.getObjectName();
if (tableName.contains(LAKE_TABLE_SPLITTER)) {
- tableName = tableName.substring(0,
tableName.indexOf(LAKE_TABLE_SPLITTER));
+ // Extract the lake table name: for "table$lake" -> "table"
+ // for "table$lake$snapshots" -> "table$snapshots"
+ String baseTableName = tableName.substring(0,
tableName.indexOf(LAKE_TABLE_SPLITTER));
+ String suffix =
+ tableName.substring(
+ tableName.indexOf(LAKE_TABLE_SPLITTER) +
LAKE_TABLE_SPLITTER.length());
+ String lakeTableName = suffix.isEmpty() ? baseTableName :
baseTableName + suffix;
Review Comment:
The string manipulation logic is complex and could be error-prone. Consider
extracting this into a helper method with clear documentation and unit tests to
handle edge cases.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java:
##########
@@ -20,37 +20,110 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.paimon.flink.FlinkTableFactory;
+
+import java.util.Map;
/** A factory to create {@link DynamicTableSource} for lake table. */
public class LakeTableFactory {
+ private final LakeCatalog lakeCatalog;
- // now, always assume is paimon, todo need to describe lake storage from
- // to know which lake storage used
- private final org.apache.paimon.flink.FlinkTableFactory
paimonFlinkTableFactory;
-
- public LakeTableFactory() {
- paimonFlinkTableFactory = new FlinkTableFactory();
+ public LakeTableFactory(LakeCatalog lakeCatalog) {
+ this.lakeCatalog = lakeCatalog;
}
public DynamicTableSource createDynamicTableSource(
DynamicTableFactory.Context context, String tableName) {
ObjectIdentifier originIdentifier = context.getObjectIdentifier();
- ObjectIdentifier paimonIdentifier =
+ ObjectIdentifier lakeIdentifier =
ObjectIdentifier.of(
originIdentifier.getCatalogName(),
originIdentifier.getDatabaseName(),
tableName);
+
+ // Determine the lake format from the table options
+ Map<String, String> tableOptions =
context.getCatalogTable().getOptions();
+
+ // If not present, fallback to 'fluss.table.datalake.format' (set by
Fluss)
+ String connector = tableOptions.get("connector");
+ if (connector == null) {
+ connector = tableOptions.get("fluss.table.datalake.format");
+ }
+
+ if (connector == null) {
+ // For Paimon system tables (like table_name$options), the table
options are empty
+ // Default to Paimon for backward compatibility
+ connector = "paimon";
+ }
Review Comment:
Hard-coding the default to 'paimon' may cause issues when Iceberg becomes
the primary format. Consider making this configurable or determining the
default based on system configuration.
##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java:
##########
@@ -273,6 +276,76 @@ void testUnionReadInStreamMode(Boolean isPartitioned)
throws Exception {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws
Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "lake_pk_table_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ // create table & write initial data
+ long tableId =
+ preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
+
+ // wait until records have been synced to Iceberg
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ // Test 1: Read Iceberg lake table directly using $lake suffix
+ TableResult lakeTableResult =
+ batchTEnv.executeSql(String.format("select * from %s$lake",
tableName));
+ List<Row> icebergRows =
CollectionUtil.iteratorToList(lakeTableResult.collect());
+
+ // Verify that we can read data from Iceberg via $lake suffix
+ assertThat(icebergRows).isNotEmpty();
+
+ // Note: The expected row count should be based on how many rows were
written
+ // In preparePKTableFullType, we write 2 unique rows (by PK) per
iteration, 2 iterations
+ // Since this is a primary key table, duplicate PKs are deduplicated,
so only 2 unique rows
+ // per partition
+ int expectedUserRowCount = isPartitioned ? 2 *
waitUntilPartitions(t1).size() : 2;
+ assertThat(icebergRows).hasSize(expectedUserRowCount);
+
+ // verify rows have expected number of columns (user columns +
potential Iceberg metadata)
+ int userColumnCount = 16; // The table has 16 columns
Review Comment:
Magic number 16 should be extracted as a named constant or calculated
dynamically from the table schema to avoid maintenance issues when the table
structure changes.
```suggestion
int userColumnCount =
lakeTableResult.getResolvedSchema().getColumnCount(); // Dynamically get column
count
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]