Copilot commented on code in PR #1799:
URL: https://github.com/apache/fluss/pull/1799#discussion_r2427698073


##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.flink;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.flink.catalog.FlinkCatalog;
+import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link FlinkCatalog}. */
+class FlinkCatalogLakeTest extends FlinkIcebergTieringTestBase {
+
+    protected static final String DEFAULT_DB = "fluss";
+
+    protected static final String CATALOG_NAME = "test_iceberg_lake";
+
+    FlinkCatalog catalog;
+
+    @BeforeEach
+    public void beforeEach() {
+        super.beforeEach();
+        buildCatalog();
+    }
+
+    @Test
+    // TODO: remove this test in #1803

Review Comment:
   The TODO comment references issue #1803 but provides no context about why 
this test should be removed or what #1803 addresses. Consider adding more 
context or a brief explanation.
   ```suggestion
       // TODO: remove this test in #1803. Issue #1803 tracks the refactoring 
of lake table handling; this test will become obsolete once lake table logic is 
moved to the new implementation.
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,128 @@
 
 package org.apache.fluss.flink.lake;
 
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.utils.MapUtils;
+
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.FlinkFileIOLoader;
 import org.apache.paimon.options.Options;
 
+import java.lang.reflect.Method;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
 
 /** A lake catalog to delegate the operations on lake table. */
 public class LakeCatalog {
+    private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE =
+            MapUtils.newConcurrentHashMap();
+
+    private static final ReentrantLock LOCK = new ReentrantLock();
+
+    private final String catalogName;
+    private final ClassLoader classLoader;
+
+    public LakeCatalog(String catalogName, ClassLoader classLoader) {
+        this.catalogName = catalogName;
+        this.classLoader = classLoader;
+    }
+
+    public Catalog getLakeCatalog(Configuration tableOptions) {
+        return inLock(
+                LOCK,
+                () -> {
+                    DataLakeFormat lakeFormat =
+                            
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+                    if (lakeFormat == PAIMON) {
+                        // TODO: Currently, a Fluss cluster only supports a 
single DataLake storage.
+                        // However, in the
+                        //  future, it may support multiple DataLakes. The 
following code assumes
+                        // that a single
+                        //  lakeCatalog is shared across multiple tables, 
which will no longer be
+                        // valid in such
+                        //  cases and should be updated accordingly.
+                        LAKE_CATALOG_CACHE.computeIfAbsent(
+                                PAIMON,
+                                k ->
+                                        PaimonCatalogFactory.create(
+                                                catalogName, tableOptions, 
classLoader));
 
-    // currently, only support paimon
-    // todo make it pluggable
-    private final FlinkCatalog paimonFlinkCatalog;
-
-    public LakeCatalog(
-            String catalogName, Map<String, String> catalogProperties, 
ClassLoader classLoader) {
-        CatalogContext catalogContext =
-                CatalogContext.create(
-                        Options.fromMap(catalogProperties), null, new 
FlinkFileIOLoader());
-        paimonFlinkCatalog =
-                FlinkCatalogFactory.createCatalog(catalogName, catalogContext, 
classLoader);
+                    } else if (lakeFormat == ICEBERG) {
+                        LAKE_CATALOG_CACHE.computeIfAbsent(
+                                ICEBERG,
+                                k -> IcebergCatalogFactory.create(catalogName, 
tableOptions));
+                    } else {
+                        throw new UnsupportedOperationException(
+                                "Unsupported datalake format: " + lakeFormat);
+                    }
+                    return LAKE_CATALOG_CACHE.get(lakeFormat);
+                });
     }
 
-    public CatalogBaseTable getTable(ObjectPath objectPath)
-            throws TableNotExistException, CatalogException {
-        return paimonFlinkCatalog.getTable(objectPath);
+    /**
+     * Factory for creating Paimon Catalog instances.
+     *
+     * <p>Purpose: Encapsulates Paimon-related dependencies (e.g. 
FlinkFileIOLoader) to avoid direct
+     * dependency in the main LakeCatalog class.
+     */
+    public static class PaimonCatalogFactory {
+
+        private PaimonCatalogFactory() {}
+
+        public static Catalog create(
+                String catalogName, Configuration tableOptions, ClassLoader 
classLoader) {
+            Map<String, String> catalogProperties =
+                    DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+            return FlinkCatalogFactory.createCatalog(
+                    catalogName,
+                    CatalogContext.create(
+                            Options.fromMap(catalogProperties), null, new 
FlinkFileIOLoader()),
+                    classLoader);
+        }
+    }
+
+    /** Factory use reflection to create Iceberg Catalog instances. */
+    public static class IcebergCatalogFactory {
+
+        private IcebergCatalogFactory() {}
+
+        // Iceberg 1.4.3 is the last Java 8 compatible version, while Flink 
Iceberg 1.18+ requires
+        // 1.5.0+.
+        // Using reflection to maintain Java 8 compatibility.
+        public static Catalog create(String catalogName, Configuration 
tableOptions) {
+            Map<String, String> catalogProperties =
+                    DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+            // Map "type" to "catalog-type" (equivalent)
+            // Required: either "catalog-type" (standard type) or 
"catalog-impl"
+            // (fully-qualified custom class, mandatory if "catalog-type" is 
missing)
+            if (catalogProperties.containsKey("type")) {
+                catalogProperties.put("catalog-type", 
catalogProperties.get("type"));
+            }
+            try {
+                Class<?> flinkCatalogFactoryClass =
+                        
Class.forName("org.apache.iceberg.flink.FlinkCatalogFactory");
+                Object factoryInstance =
+                        
flinkCatalogFactoryClass.getDeclaredConstructor().newInstance();
+
+                Method createCatalogMethod =
+                        flinkCatalogFactoryClass.getMethod(
+                                "createCatalog", String.class, Map.class);
+                return (Catalog)
+                        createCatalogMethod.invoke(factoryInstance, 
catalogName, catalogProperties);
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        "Failed to create Iceberg catalog using reflection. 
Please make sure iceberg-flink-runtime is on the classpath.",

Review Comment:
   The error message mentions 'iceberg-flink-runtime' but should be more 
specific about the exact artifact name. Based on the pom.xml, it should 
reference 'iceberg-flink-${flink.major.version}' or provide the full artifact 
coordinates.
   ```suggestion
                           "Failed to create Iceberg catalog using reflection. 
Please make sure the correct Iceberg Flink connector is on the classpath (e.g., 
'org.apache.iceberg:iceberg-flink-${flink.major.version}:<version>'). Refer to 
your Flink version and the Iceberg documentation for the exact artifact 
coordinates.",
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,128 @@
 
 package org.apache.fluss.flink.lake;
 
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.utils.MapUtils;
+
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.FlinkFileIOLoader;
 import org.apache.paimon.options.Options;
 
+import java.lang.reflect.Method;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
 
 /** A lake catalog to delegate the operations on lake table. */
 public class LakeCatalog {
+    private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE =
+            MapUtils.newConcurrentHashMap();
+
+    private static final ReentrantLock LOCK = new ReentrantLock();
+
+    private final String catalogName;
+    private final ClassLoader classLoader;
+
+    public LakeCatalog(String catalogName, ClassLoader classLoader) {
+        this.catalogName = catalogName;
+        this.classLoader = classLoader;
+    }
+
+    public Catalog getLakeCatalog(Configuration tableOptions) {
+        return inLock(
+                LOCK,
+                () -> {
+                    DataLakeFormat lakeFormat =
+                            
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+                    if (lakeFormat == PAIMON) {
+                        // TODO: Currently, a Fluss cluster only supports a 
single DataLake storage.
+                        // However, in the
+                        //  future, it may support multiple DataLakes. The 
following code assumes
+                        // that a single
+                        //  lakeCatalog is shared across multiple tables, 
which will no longer be
+                        // valid in such
+                        //  cases and should be updated accordingly.
+                        LAKE_CATALOG_CACHE.computeIfAbsent(
+                                PAIMON,
+                                k ->
+                                        PaimonCatalogFactory.create(
+                                                catalogName, tableOptions, 
classLoader));
 
-    // currently, only support paimon
-    // todo make it pluggable
-    private final FlinkCatalog paimonFlinkCatalog;
-
-    public LakeCatalog(
-            String catalogName, Map<String, String> catalogProperties, 
ClassLoader classLoader) {
-        CatalogContext catalogContext =
-                CatalogContext.create(
-                        Options.fromMap(catalogProperties), null, new 
FlinkFileIOLoader());
-        paimonFlinkCatalog =
-                FlinkCatalogFactory.createCatalog(catalogName, catalogContext, 
classLoader);
+                    } else if (lakeFormat == ICEBERG) {
+                        LAKE_CATALOG_CACHE.computeIfAbsent(
+                                ICEBERG,
+                                k -> IcebergCatalogFactory.create(catalogName, 
tableOptions));
+                    } else {
+                        throw new UnsupportedOperationException(
+                                "Unsupported datalake format: " + lakeFormat);
+                    }
+                    return LAKE_CATALOG_CACHE.get(lakeFormat);
+                });
     }
 
-    public CatalogBaseTable getTable(ObjectPath objectPath)
-            throws TableNotExistException, CatalogException {
-        return paimonFlinkCatalog.getTable(objectPath);
+    /**
+     * Factory for creating Paimon Catalog instances.
+     *
+     * <p>Purpose: Encapsulates Paimon-related dependencies (e.g. 
FlinkFileIOLoader) to avoid direct
+     * dependency in the main LakeCatalog class.
+     */
+    public static class PaimonCatalogFactory {
+
+        private PaimonCatalogFactory() {}
+
+        public static Catalog create(
+                String catalogName, Configuration tableOptions, ClassLoader 
classLoader) {
+            Map<String, String> catalogProperties =
+                    DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+            return FlinkCatalogFactory.createCatalog(
+                    catalogName,
+                    CatalogContext.create(
+                            Options.fromMap(catalogProperties), null, new 
FlinkFileIOLoader()),
+                    classLoader);
+        }
+    }
+
+    /** Factory use reflection to create Iceberg Catalog instances. */
+    public static class IcebergCatalogFactory {
+
+        private IcebergCatalogFactory() {}
+
+        // Iceberg 1.4.3 is the last Java 8 compatible version, while Flink 
Iceberg 1.18+ requires
+        // 1.5.0+.

Review Comment:
   The comment mentions 'Flink Iceberg 1.18+' but it's unclear what this refers 
to. Consider clarifying whether this means Flink version 1.18+ or Iceberg 
connector version 1.18+.
   ```suggestion
           // Iceberg 1.4.3 is the last Java 8 compatible version, while the 
Iceberg Flink connector version 1.18+ requires
           // Iceberg 1.5.0+.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to