luoyuxia commented on code in PR #1799:
URL: https://github.com/apache/fluss/pull/1799#discussion_r2426262894


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,96 @@
 
 package org.apache.fluss.flink.lake;
 
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.utils.MapUtils;
+
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.FlinkFileIOLoader;
 import org.apache.paimon.options.Options;
 
+import java.lang.reflect.Method;
 import java.util.Map;
 
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+
 /** A lake catalog to delegate the operations on lake table. */
 public class LakeCatalog {
+    private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE =
+            MapUtils.newConcurrentHashMap();
 
-    // currently, only support paimon
-    // todo make it pluggable
-    private final FlinkCatalog paimonFlinkCatalog;
-
-    public LakeCatalog(
-            String catalogName, Map<String, String> catalogProperties, 
ClassLoader classLoader) {
-        CatalogContext catalogContext =
-                CatalogContext.create(
-                        Options.fromMap(catalogProperties), null, new 
FlinkFileIOLoader());
-        paimonFlinkCatalog =
-                FlinkCatalogFactory.createCatalog(catalogName, catalogContext, 
classLoader);
+    private final String catalogName;
+    private final ClassLoader classLoader;
+
+    public LakeCatalog(String catalogName, ClassLoader classLoader) {
+        this.catalogName = catalogName;
+        this.classLoader = classLoader;
     }
 
-    public CatalogBaseTable getTable(ObjectPath objectPath)
-            throws TableNotExistException, CatalogException {
-        return paimonFlinkCatalog.getTable(objectPath);
+    public Catalog getLakeCatalog(Configuration tableOptions) {
+        DataLakeFormat lakeFormat = 
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+        Map<String, String> catalogProperties =
+                DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+        if (lakeFormat == PAIMON) {
+            LAKE_CATALOG_CACHE.computeIfAbsent(
+                    PAIMON,
+                    k -> PaimonCatalogFactory.create(catalogName, 
catalogProperties, classLoader));
+
+        } else if (lakeFormat == ICEBERG) {
+            LAKE_CATALOG_CACHE.computeIfAbsent(
+                    ICEBERG, k -> IcebergCatalogFactory.create(catalogName, 
catalogProperties));
+        } else {
+            throw new UnsupportedOperationException("Unsupported datalake 
format: " + lakeFormat);
+        }
+        return LAKE_CATALOG_CACHE.get(lakeFormat);
+    }
+
+    /**
+     * Factory for creating Paimon Catalog instances.
+     *
+     * <p>Purpose: Encapsulates Paimon-related dependencies (e.g. 
FlinkFileIOLoader) to avoid direct
+     * dependency in the main LakeCatalog class.
+     */
+    public static class PaimonCatalogFactory {
+
+        private PaimonCatalogFactory() {}
+
+        public static Catalog create(
+                String catalogName, Map<String, String> properties, 
ClassLoader classLoader) {
+            return FlinkCatalogFactory.createCatalog(
+                    catalogName,
+                    CatalogContext.create(
+                            Options.fromMap(properties), null, new 
FlinkFileIOLoader()),
+                    classLoader);
+        }
+    }
+
+    /** Factory use reflection to create Iceberg Catalog instances. */
+    public static class IcebergCatalogFactory {
+
+        private IcebergCatalogFactory() {}
+
+        public static Catalog create(String catalogName, Map<String, String> 
properties) {
+            properties.put("catalog-type", properties.get("type"));
+            try {
+                Class<?> flinkCatalogFactoryClass =
+                        
Class.forName("org.apache.iceberg.flink.FlinkCatalogFactory");
+                Object factoryInstance =
+                        
flinkCatalogFactoryClass.getDeclaredConstructor().newInstance();
+
+                Method createCatalogMethod =
+                        flinkCatalogFactoryClass.getMethod(
+                                "createCatalog", String.class, Map.class);
+                return (Catalog)
+                        createCatalogMethod.invoke(factoryInstance, 
catalogName, properties);
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to create Iceberg catalog 
using reflection", e);

Review Comment:
   ```suggestion
                   throw new RuntimeException("Failed to create Iceberg catalog 
using reflection. Please make sure iceberg-flink-runtime is on the classpath.", 
e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to