Copilot commented on code in PR #1797:
URL: https://github.com/apache/fluss/pull/1797#discussion_r2425611092


##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -250,6 +250,49 @@ void testUnionReadLogTableFailover(boolean isPartitioned) 
throws Exception {
         jobClient.cancel().get();
     }
 
+    // This test verifies that the $lake suffix works and data can be 
retrieved directly from
+    // Iceberg
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadIcebergLakeTableDirectly(boolean isPartitioned) throws 
Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName =
+                "lake_direct_logTable_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        List<Row> writtenRows = new ArrayList<>();
+        long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
writtenRows);
+        // wait until records has been synced to Iceberg
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // Read Iceberg snapshot directly using $lake suffix
+        TableResult lakeTableResult =
+                batchTEnv.executeSql(String.format("select * from %s$lake", 
tableName));
+        List<Row> icebergRows = 
CollectionUtil.iteratorToList(lakeTableResult.collect());
+
+        // Verify that we can read data from Iceberg via $lake suffix
+        assertThat(icebergRows).isNotEmpty();
+        assertThat(icebergRows).hasSize(writtenRows.size());
+
+        // Note: Iceberg adds metadata columns (_spec_id, _partition, _file) 
at the end.
+        // This test verifies that the $lake suffix works and data can be 
retrieved,

Review Comment:
   This comment is incomplete and ends abruptly with a comma. It should be 
completed or the trailing comma should be removed.
   ```suggestion
           // This test verifies that the $lake suffix works and data can be 
retrieved.
   ```



##########
fluss-lake/fluss-lake-iceberg/pom.xml:
##########
@@ -31,6 +31,25 @@
     <name>Fluss : Lake : Iceberg</name>
 
     <packaging>jar</packaging>
+
+    <!-- Override Jackson version for Iceberg compatibility in tests -->
+    <properties>
+        <jackson.version.test>2.18.3</jackson.version.test>
+    </properties>

Review Comment:
   The property `jackson.version.test` is defined but never used. The actual 
Jackson version override is done in the dependencyManagement section below. 
Either use this property in the dependencyManagement section or remove it.
   ```suggestion
   
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java:
##########
@@ -17,40 +17,86 @@
 
 package org.apache.fluss.flink.lake;
 
+import org.apache.fluss.metadata.DataLakeFormat;
+
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.paimon.flink.FlinkTableFactory;
 
 /** A factory to create {@link DynamicTableSource} for lake table. */
 public class LakeTableFactory {
 
-    // now, always assume is paimon, todo need to describe lake storage from
-    // to know which lake storage used
-    private final org.apache.paimon.flink.FlinkTableFactory 
paimonFlinkTableFactory;
+    private final Object delegateFactory;
+    private final DataLakeFormat lakeFormat;
+
+    public LakeTableFactory(DataLakeFormat lakeFormat, ClassLoader 
classLoader) {
+        this.lakeFormat = lakeFormat;
+
+        if (lakeFormat == DataLakeFormat.PAIMON) {
+            this.delegateFactory = createPaimonFlinkTableFactory(classLoader);
+        } else if (lakeFormat == DataLakeFormat.ICEBERG) {
+            this.delegateFactory = createIcebergFlinkTableFactory(classLoader);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported lake format: "
+                            + lakeFormat
+                            + ". Only PAIMON and ICEBERG are supported.");
+        }
+    }
+
+    private Object createPaimonFlinkTableFactory(ClassLoader classLoader) {
+        try {
+            Class<?> paimonFactoryClass =
+                    
classLoader.loadClass("org.apache.paimon.flink.FlinkTableFactory");
+            return paimonFactoryClass.getDeclaredConstructor().newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to create Paimon FlinkTableFactory. Make sure 
paimon-flink is on the classpath.",
+                    e);
+        }
+    }
 
-    public LakeTableFactory() {
-        paimonFlinkTableFactory = new FlinkTableFactory();
+    private Object createIcebergFlinkTableFactory(ClassLoader classLoader) {
+        try {
+            Class<?> icebergFactoryClass =
+                    
classLoader.loadClass("org.apache.iceberg.flink.FlinkDynamicTableFactory");
+            return icebergFactoryClass.getDeclaredConstructor().newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to create Iceberg FlinkDynamicTableFactory. Make 
sure iceberg-flink is on the classpath.",
+                    e);
+        }
     }
 
     public DynamicTableSource createDynamicTableSource(
             DynamicTableFactory.Context context, String tableName) {
         ObjectIdentifier originIdentifier = context.getObjectIdentifier();
-        ObjectIdentifier paimonIdentifier =
+        ObjectIdentifier lakeIdentifier =
                 ObjectIdentifier.of(
                         originIdentifier.getCatalogName(),
                         originIdentifier.getDatabaseName(),
                         tableName);
         DynamicTableFactory.Context newContext =
                 new FactoryUtil.DefaultDynamicTableContext(
-                        paimonIdentifier,
+                        lakeIdentifier,
                         context.getCatalogTable(),
                         context.getEnrichmentOptions(),
                         context.getConfiguration(),
                         context.getClassLoader(),
                         context.isTemporary());
 
-        return paimonFlinkTableFactory.createDynamicTableSource(newContext);
+        try {
+            // Use reflection to call createDynamicTableSource on the delegate 
factory
+            java.lang.reflect.Method createMethod =
+                    delegateFactory
+                            .getClass()
+                            .getMethod(
+                                    "createDynamicTableSource", 
DynamicTableFactory.Context.class);
+            return (DynamicTableSource) createMethod.invoke(delegateFactory, 
newContext);
+        } catch (Exception e) {

Review Comment:
   The reflection call should handle more specific exceptions. Consider 
catching ReflectiveOperationException instead of the generic Exception to 
better distinguish reflection-related errors from other potential issues.
   ```suggestion
           } catch (ReflectiveOperationException e) {
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,124 @@
 
 package org.apache.fluss.flink.lake;
 
+import org.apache.fluss.metadata.DataLakeFormat;
+
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
-import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.flink.FlinkFileIOLoader;
-import org.apache.paimon.options.Options;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.Map;
 
 /** A lake catalog to delegate the operations on lake table. */
 public class LakeCatalog {
 
-    // currently, only support paimon
-    // todo make it pluggable
-    private final FlinkCatalog paimonFlinkCatalog;
+    private final Catalog delegateCatalog;
+    private final DataLakeFormat lakeFormat;
 
     public LakeCatalog(
+            String catalogName,
+            Map<String, String> catalogProperties,
+            ClassLoader classLoader,
+            DataLakeFormat lakeFormat) {
+        this.lakeFormat = lakeFormat;
+
+        if (lakeFormat == DataLakeFormat.PAIMON) {
+            this.delegateCatalog =
+                    createPaimonFlinkCatalog(catalogName, catalogProperties, 
classLoader);
+        } else if (lakeFormat == DataLakeFormat.ICEBERG) {
+            this.delegateCatalog =
+                    createIcebergFlinkCatalog(catalogName, catalogProperties, 
classLoader);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported lake format: "
+                            + lakeFormat
+                            + ". Only PAIMON and ICEBERG are supported.");
+        }
+    }
+
+    private Catalog createPaimonFlinkCatalog(
             String catalogName, Map<String, String> catalogProperties, 
ClassLoader classLoader) {
-        CatalogContext catalogContext =
-                CatalogContext.create(
-                        Options.fromMap(catalogProperties), null, new 
FlinkFileIOLoader());
-        paimonFlinkCatalog =
-                FlinkCatalogFactory.createCatalog(catalogName, catalogContext, 
classLoader);
+        try {
+            // Use Paimon's Flink catalog factory via reflection to avoid hard 
dependency
+            Class<?> catalogContextClass =
+                    
classLoader.loadClass("org.apache.paimon.catalog.CatalogContext");
+            Class<?> optionsClass = 
classLoader.loadClass("org.apache.paimon.options.Options");
+            Class<?> fileIOLoaderClass =
+                    
classLoader.loadClass("org.apache.paimon.flink.FlinkFileIOLoader");
+            Class<?> flinkCatalogFactoryClass =
+                    
classLoader.loadClass("org.apache.paimon.flink.FlinkCatalogFactory");
+
+            // Create Options.fromMap(catalogProperties)
+            Method fromMapMethod = optionsClass.getMethod("fromMap", 
Map.class);
+            Object options = fromMapMethod.invoke(null, catalogProperties);
+
+            // Create new FlinkFileIOLoader()
+            Object fileIOLoader = 
fileIOLoaderClass.getDeclaredConstructor().newInstance();
+
+            // Create CatalogContext.create(options, null, fileIOLoader)
+            // CatalogContext.create signature: create(Options, Configuration, 
FileIOLoader)
+            Class<?> hadoopConfigClass =
+                    
classLoader.loadClass("org.apache.hadoop.conf.Configuration");
+            Method createMethod =
+                    catalogContextClass.getMethod(
+                            "create", optionsClass, hadoopConfigClass, 
fileIOLoaderClass);
+            Object catalogContext = createMethod.invoke(null, options, null, 
fileIOLoader);
+
+            // Call FlinkCatalogFactory.createCatalog(catalogName, 
catalogContext, classLoader)
+            Method createCatalogMethod =
+                    flinkCatalogFactoryClass.getMethod(
+                            "createCatalog", String.class, 
catalogContextClass, ClassLoader.class);
+            return (Catalog)
+                    createCatalogMethod.invoke(null, catalogName, 
catalogContext, classLoader);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    "Failed to create Paimon Flink catalog. Make sure 
paimon-flink is on the classpath.",
+                    e);
+        }
     }
 
     public CatalogBaseTable getTable(ObjectPath objectPath)
             throws TableNotExistException, CatalogException {
-        return paimonFlinkCatalog.getTable(objectPath);
+        return delegateCatalog.getTable(objectPath);
+    }
+
+    private Catalog createIcebergFlinkCatalog(
+            String catalogName, Map<String, String> catalogProperties, 
ClassLoader classLoader) {
+        try {
+            // Use Iceberg's Flink catalog factory via reflection to avoid 
hard dependency
+            Class<?> icebergCatalogFactoryClass =
+                    
classLoader.loadClass("org.apache.iceberg.flink.FlinkCatalogFactory");
+            Object factoryInstance =
+                    
icebergCatalogFactoryClass.getDeclaredConstructor().newInstance();
+            Method createCatalogMethod =
+                    icebergCatalogFactoryClass.getMethod("createCatalog", 
String.class, Map.class);
+
+            // Prepare Iceberg catalog properties
+            // Iceberg expects 'catalog-type' instead of 'type' for the 
catalog implementation
+            Map<String, String> icebergProps = new 
java.util.HashMap<>(catalogProperties);

Review Comment:
   Use `HashMap` instead of the fully qualified `java.util.HashMap` for 
consistency with the rest of the codebase. The import statement should be added 
at the top of the file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to