Copilot commented on code in PR #1812:
URL: https://github.com/apache/fluss/pull/1812#discussion_r2431665171


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java:
##########
@@ -20,37 +20,110 @@
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.paimon.flink.FlinkTableFactory;
+
+import java.util.Map;
 
 /** A factory to create {@link DynamicTableSource} for lake table. */
 public class LakeTableFactory {
+    private final LakeCatalog lakeCatalog;
 

Review Comment:
   The constructor change from no-args to requiring `LakeCatalog` is a breaking 
change. Consider adding a deprecated no-args constructor for backward 
compatibility or ensure all existing usages are updated.
   ```suggestion
   
       /**
        * Deprecated no-argument constructor for backward compatibility.
        * lakeCatalog will be null. Do not use in new code.
        */
       @Deprecated
       public LakeTableFactory() {
           this.lakeCatalog = null;
       }
   ```



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java:
##########
@@ -273,6 +276,76 @@ void testUnionReadInStreamMode(Boolean isPartitioned) 
throws Exception {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws 
Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "lake_pk_table_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        // create table & write initial data
+        long tableId =
+                preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
bucketLogEndOffset);
+
+        // wait until records have been synced to Iceberg
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // Test 1: Read Iceberg lake table directly using $lake suffix
+        TableResult lakeTableResult =
+                batchTEnv.executeSql(String.format("select * from %s$lake", 
tableName));
+        List<Row> icebergRows = 
CollectionUtil.iteratorToList(lakeTableResult.collect());
+
+        // Verify that we can read data from Iceberg via $lake suffix
+        assertThat(icebergRows).isNotEmpty();
+
+        // Note: The expected row count should be based on how many rows were 
written
+        // In preparePKTableFullType, we write 2 unique rows (by PK) per 
iteration, 2 iterations
+        // Since this is a primary key table, duplicate PKs are deduplicated, 
so only 2 unique rows
+        // per partition
+        int expectedUserRowCount = isPartitioned ? 2 * 
waitUntilPartitions(t1).size() : 2;
+        assertThat(icebergRows).hasSize(expectedUserRowCount);
+
+        // verify rows have expected number of columns (user columns + 
potential Iceberg metadata)
+        int userColumnCount = 16; // The table has 16 columns
+        Row firstRow = icebergRows.get(0);
+        assertThat(firstRow.getArity())
+                .as("Iceberg row should have at least user columns")
+                .isGreaterThanOrEqualTo(userColumnCount);
+
+        // Test 2: Read Iceberg system table (snapshots) using $lake$snapshots 
suffix
+        TableResult snapshotsResult =
+                batchTEnv.executeSql(String.format("select * from 
%s$lake$snapshots", tableName));
+        List<Row> snapshotRows = 
CollectionUtil.iteratorToList(snapshotsResult.collect());
+
+        // Verify that we can read snapshots from Iceberg via $lake$snapshots 
suffix
+        assertThat(snapshotRows).as("Should have at least one 
snapshot").isNotEmpty();
+
+        // Verify snapshot structure based on Iceberg snapshots table schema
+        // Expected columns: committed_at, snapshot_id, parent_id, operation, 
manifest_list, summary
+        Row firstSnapshot = snapshotRows.get(0);
+        assertThat(firstSnapshot.getArity()).as("Snapshot row should have 6 
columns").isEqualTo(6);

Review Comment:
   Magic number 6 should be extracted as a named constant representing the 
expected Iceberg snapshots table column count for better maintainability.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -68,17 +69,29 @@
 /** Factory to create table source and table sink for Fluss. */
 public class FlinkTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
 
+    private final LakeCatalog lakeCatalog;
     private volatile LakeTableFactory lakeTableFactory;
 
+    public FlinkTableFactory(LakeCatalog lakeCatalog) {
+        this.lakeCatalog = lakeCatalog;
+    }
+
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         // check whether should read from datalake
         ObjectIdentifier tableIdentifier = context.getObjectIdentifier();
         String tableName = tableIdentifier.getObjectName();
         if (tableName.contains(LAKE_TABLE_SPLITTER)) {
-            tableName = tableName.substring(0, 
tableName.indexOf(LAKE_TABLE_SPLITTER));
+            // Extract the lake table name: for "table$lake" -> "table"
+            // for "table$lake$snapshots" -> "table$snapshots"
+            String baseTableName = tableName.substring(0, 
tableName.indexOf(LAKE_TABLE_SPLITTER));
+            String suffix =
+                    tableName.substring(
+                            tableName.indexOf(LAKE_TABLE_SPLITTER) + 
LAKE_TABLE_SPLITTER.length());
+            String lakeTableName = suffix.isEmpty() ? baseTableName : 
baseTableName + suffix;

Review Comment:
   The string manipulation logic is complex and could be error-prone. Consider 
extracting this into a helper method with clear documentation and unit tests to 
handle edge cases.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java:
##########
@@ -20,37 +20,110 @@
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.paimon.flink.FlinkTableFactory;
+
+import java.util.Map;
 
 /** A factory to create {@link DynamicTableSource} for lake table. */
 public class LakeTableFactory {
+    private final LakeCatalog lakeCatalog;
 
-    // now, always assume is paimon, todo need to describe lake storage from
-    // to know which lake storage used
-    private final org.apache.paimon.flink.FlinkTableFactory 
paimonFlinkTableFactory;
-
-    public LakeTableFactory() {
-        paimonFlinkTableFactory = new FlinkTableFactory();
+    public LakeTableFactory(LakeCatalog lakeCatalog) {
+        this.lakeCatalog = lakeCatalog;
     }
 
     public DynamicTableSource createDynamicTableSource(
             DynamicTableFactory.Context context, String tableName) {
         ObjectIdentifier originIdentifier = context.getObjectIdentifier();
-        ObjectIdentifier paimonIdentifier =
+        ObjectIdentifier lakeIdentifier =
                 ObjectIdentifier.of(
                         originIdentifier.getCatalogName(),
                         originIdentifier.getDatabaseName(),
                         tableName);
+
+        // Determine the lake format from the table options
+        Map<String, String> tableOptions = 
context.getCatalogTable().getOptions();
+
+        // If not present, fallback to 'fluss.table.datalake.format' (set by 
Fluss)
+        String connector = tableOptions.get("connector");
+        if (connector == null) {
+            connector = tableOptions.get("fluss.table.datalake.format");
+        }
+
+        if (connector == null) {
+            // For Paimon system tables (like table_name$options), the table 
options are empty
+            // Default to Paimon for backward compatibility
+            connector = "paimon";
+        }

Review Comment:
   Hard-coding the default to 'paimon' may cause issues when Iceberg becomes 
the primary format. Consider making this configurable or determining the 
default based on system configuration.



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java:
##########
@@ -273,6 +276,76 @@ void testUnionReadInStreamMode(Boolean isPartitioned) 
throws Exception {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws 
Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "lake_pk_table_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        // create table & write initial data
+        long tableId =
+                preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
bucketLogEndOffset);
+
+        // wait until records have been synced to Iceberg
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // Test 1: Read Iceberg lake table directly using $lake suffix
+        TableResult lakeTableResult =
+                batchTEnv.executeSql(String.format("select * from %s$lake", 
tableName));
+        List<Row> icebergRows = 
CollectionUtil.iteratorToList(lakeTableResult.collect());
+
+        // Verify that we can read data from Iceberg via $lake suffix
+        assertThat(icebergRows).isNotEmpty();
+
+        // Note: The expected row count should be based on how many rows were 
written
+        // In preparePKTableFullType, we write 2 unique rows (by PK) per 
iteration, 2 iterations
+        // Since this is a primary key table, duplicate PKs are deduplicated, 
so only 2 unique rows
+        // per partition
+        int expectedUserRowCount = isPartitioned ? 2 * 
waitUntilPartitions(t1).size() : 2;
+        assertThat(icebergRows).hasSize(expectedUserRowCount);
+
+        // verify rows have expected number of columns (user columns + 
potential Iceberg metadata)
+        int userColumnCount = 16; // The table has 16 columns

Review Comment:
   Magic number 16 should be extracted as a named constant or calculated 
dynamically from the table schema to avoid maintenance issues when the table 
structure changes.
   ```suggestion
           int userColumnCount = 
lakeTableResult.getResolvedSchema().getColumnCount(); // Dynamically get column 
count
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to