luoyuxia commented on code in PR #1812:
URL: https://github.com/apache/fluss/pull/1812#discussion_r2431744136


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -68,17 +69,29 @@
 /** Factory to create table source and table sink for Fluss. */
 public class FlinkTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
 
+    private final LakeCatalog lakeCatalog;
     private volatile LakeTableFactory lakeTableFactory;
 
+    public FlinkTableFactory(LakeCatalog lakeCatalog) {
+        this.lakeCatalog = lakeCatalog;
+    }
+
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         // check whether should read from datalake
         ObjectIdentifier tableIdentifier = context.getObjectIdentifier();
         String tableName = tableIdentifier.getObjectName();
         if (tableName.contains(LAKE_TABLE_SPLITTER)) {
-            tableName = tableName.substring(0, 
tableName.indexOf(LAKE_TABLE_SPLITTER));
+            // Extract the lake table name: for "table$lake" -> "table"
+            // for "table$lake$snapshots" -> "table$snapshots"
+            String baseTableName = tableName.substring(0, 
tableName.indexOf(LAKE_TABLE_SPLITTER));

Review Comment:
   nit: can be replace with
   ```
   String lakeTableName = tableName.replaceFirst("\\$lake", "");
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java:
##########
@@ -20,37 +20,110 @@
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.paimon.flink.FlinkTableFactory;
+
+import java.util.Map;
 
 /** A factory to create {@link DynamicTableSource} for lake table. */
 public class LakeTableFactory {
+    private final LakeCatalog lakeCatalog;
 
-    // now, always assume is paimon, todo need to describe lake storage from
-    // to know which lake storage used
-    private final org.apache.paimon.flink.FlinkTableFactory 
paimonFlinkTableFactory;
-
-    public LakeTableFactory() {
-        paimonFlinkTableFactory = new FlinkTableFactory();
+    public LakeTableFactory(LakeCatalog lakeCatalog) {
+        this.lakeCatalog = lakeCatalog;
     }
 
     public DynamicTableSource createDynamicTableSource(
             DynamicTableFactory.Context context, String tableName) {
         ObjectIdentifier originIdentifier = context.getObjectIdentifier();
-        ObjectIdentifier paimonIdentifier =
+        ObjectIdentifier lakeIdentifier =
                 ObjectIdentifier.of(
                         originIdentifier.getCatalogName(),
                         originIdentifier.getDatabaseName(),
                         tableName);
+
+        // Determine the lake format from the table options
+        Map<String, String> tableOptions = 
context.getCatalogTable().getOptions();
+
+        // If not present, fallback to 'fluss.table.datalake.format' (set by 
Fluss)

Review Comment:
   This peice of logic is hacky. If iceberg table don't put `connector` = 
`iceberg` into table option. The peice of logic  will fail. I create #1814 to 
track it. Currently, let's keep it in this pr.



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java:
##########
@@ -273,6 +276,76 @@ void testUnionReadInStreamMode(Boolean isPartitioned) 
throws Exception {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws 
Exception {

Review Comment:
   is it possible to put the test case to `testUnionReadInStreamMode` to reduce 
test code and time. Just like we did in 
`org.apache.fluss.lake.paimon.flink.FlinkUnionReadPrimaryKeyTableITCase.testUnionReadFullType`



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java:
##########
@@ -20,37 +20,110 @@
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.paimon.flink.FlinkTableFactory;
+
+import java.util.Map;
 
 /** A factory to create {@link DynamicTableSource} for lake table. */
 public class LakeTableFactory {
+    private final LakeCatalog lakeCatalog;
 
-    // now, always assume is paimon, todo need to describe lake storage from
-    // to know which lake storage used
-    private final org.apache.paimon.flink.FlinkTableFactory 
paimonFlinkTableFactory;
-
-    public LakeTableFactory() {
-        paimonFlinkTableFactory = new FlinkTableFactory();
+    public LakeTableFactory(LakeCatalog lakeCatalog) {
+        this.lakeCatalog = lakeCatalog;
     }
 
     public DynamicTableSource createDynamicTableSource(
             DynamicTableFactory.Context context, String tableName) {
         ObjectIdentifier originIdentifier = context.getObjectIdentifier();
-        ObjectIdentifier paimonIdentifier =
+        ObjectIdentifier lakeIdentifier =
                 ObjectIdentifier.of(
                         originIdentifier.getCatalogName(),
                         originIdentifier.getDatabaseName(),
                         tableName);
+
+        // Determine the lake format from the table options
+        Map<String, String> tableOptions = 
context.getCatalogTable().getOptions();
+
+        // If not present, fallback to 'fluss.table.datalake.format' (set by 
Fluss)
+        String connector = tableOptions.get("connector");
+        if (connector == null) {
+            connector = tableOptions.get("fluss.table.datalake.format");
+        }
+
+        if (connector == null) {
+            // For Paimon system tables (like table_name$options), the table 
options are empty
+            // Default to Paimon for backward compatibility
+            connector = "paimon";
+        }
+
+        // For Iceberg and Paimon, pass the table name as-is to their factory.
+        // Metadata tables will be handled internally by their respective 
factories.
         DynamicTableFactory.Context newContext =
                 new FactoryUtil.DefaultDynamicTableContext(
-                        paimonIdentifier,
+                        lakeIdentifier,
                         context.getCatalogTable(),
                         context.getEnrichmentOptions(),
                         context.getConfiguration(),
                         context.getClassLoader(),
                         context.isTemporary());
 
-        return paimonFlinkTableFactory.createDynamicTableSource(newContext);
+        // Get the appropriate factory based on connector type
+        DynamicTableSourceFactory factory = getLakeTableFactory(connector, 
tableOptions);
+        return factory.createDynamicTableSource(newContext);
+    }
+
+    private DynamicTableSourceFactory getLakeTableFactory(
+            String connector, Map<String, String> tableOptions) {
+        if ("paimon".equalsIgnoreCase(connector)) {
+            return getPaimonFactory();
+        } else if ("iceberg".equalsIgnoreCase(connector)) {
+            return getIcebergFactory(tableOptions);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported lake connector: "
+                            + connector
+                            + ". Only 'paimon' and 'iceberg' are supported.");
+        }
+    }
+
+    private DynamicTableSourceFactory getPaimonFactory() {
+        try {
+            Class<?> paimonFactoryClass =

Review Comment:
   for paimon, we don't need to use reflection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to