Copilot commented on code in PR #1799:
URL: https://github.com/apache/fluss/pull/1799#discussion_r2426153082


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,94 @@
 
 package org.apache.fluss.flink.lake;
 
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.FlinkFileIOLoader;
 import org.apache.paimon.options.Options;
 
+import java.lang.reflect.Method;
+import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+
 /** A lake catalog to delegate the operations on lake table. */
 public class LakeCatalog {
+    private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE = new 
HashMap<>();

Review Comment:
   The static HashMap is not thread-safe and could lead to race conditions when 
multiple threads access getLakeCatalog() simultaneously. Consider using 
ConcurrentHashMap or adding synchronization.
   ```suggestion
   import java.util.concurrent.ConcurrentHashMap;
   
   import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
   import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
   
   /** A lake catalog to delegate the operations on lake table. */
   public class LakeCatalog {
       private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE = 
new ConcurrentHashMap<>();
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,94 @@
 
 package org.apache.fluss.flink.lake;
 
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.FlinkFileIOLoader;
 import org.apache.paimon.options.Options;
 
+import java.lang.reflect.Method;
+import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+
 /** A lake catalog to delegate the operations on lake table. */
 public class LakeCatalog {
+    private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE = new 
HashMap<>();
+
+    private final String catalogName;
+    private final ClassLoader classLoader;
+
+    public LakeCatalog(String catalogName, ClassLoader classLoader) {
+        this.catalogName = catalogName;
+        this.classLoader = classLoader;
+    }
+
+    public Catalog getLakeCatalog(Configuration tableOptions) {
+        DataLakeFormat lakeFormat = 
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+        Map<String, String> catalogProperties =
+                DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+        if (lakeFormat == PAIMON && !LAKE_CATALOG_CACHE.containsKey(PAIMON)) {
+            LAKE_CATALOG_CACHE.put(
+                    PAIMON,
+                    PaimonCatalogFactory.create(catalogName, 
catalogProperties, classLoader));
+        } else if (lakeFormat == ICEBERG && 
!LAKE_CATALOG_CACHE.containsKey(ICEBERG)) {
+            LAKE_CATALOG_CACHE.put(
+                    ICEBERG, IcebergCatalogFactory.create(catalogName, 
catalogProperties));

Review Comment:
   The else clause will be executed for supported formats that are already in 
the cache, throwing an exception incorrectly. The logic should only throw for 
truly unsupported formats.
   ```suggestion
           if (lakeFormat == PAIMON) {
               if (!LAKE_CATALOG_CACHE.containsKey(PAIMON)) {
                   LAKE_CATALOG_CACHE.put(
                           PAIMON,
                           PaimonCatalogFactory.create(catalogName, 
catalogProperties, classLoader));
               }
           } else if (lakeFormat == ICEBERG) {
               if (!LAKE_CATALOG_CACHE.containsKey(ICEBERG)) {
                   LAKE_CATALOG_CACHE.put(
                           ICEBERG, IcebergCatalogFactory.create(catalogName, 
catalogProperties));
               }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,94 @@
 
 package org.apache.fluss.flink.lake;
 
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.FlinkFileIOLoader;
 import org.apache.paimon.options.Options;
 
+import java.lang.reflect.Method;
+import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+
 /** A lake catalog to delegate the operations on lake table. */
 public class LakeCatalog {
+    private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE = new 
HashMap<>();
+
+    private final String catalogName;
+    private final ClassLoader classLoader;
+
+    public LakeCatalog(String catalogName, ClassLoader classLoader) {
+        this.catalogName = catalogName;
+        this.classLoader = classLoader;
+    }
+
+    public Catalog getLakeCatalog(Configuration tableOptions) {
+        DataLakeFormat lakeFormat = 
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+        Map<String, String> catalogProperties =
+                DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+        if (lakeFormat == PAIMON && !LAKE_CATALOG_CACHE.containsKey(PAIMON)) {
+            LAKE_CATALOG_CACHE.put(
+                    PAIMON,
+                    PaimonCatalogFactory.create(catalogName, 
catalogProperties, classLoader));
+        } else if (lakeFormat == ICEBERG && 
!LAKE_CATALOG_CACHE.containsKey(ICEBERG)) {
+            LAKE_CATALOG_CACHE.put(
+                    ICEBERG, IcebergCatalogFactory.create(catalogName, 
catalogProperties));
+        } else {
+            throw new UnsupportedOperationException("Unsupported datalake 
format: " + lakeFormat);
+        }

Review Comment:
   The check-then-act operations on LAKE_CATALOG_CACHE are not atomic and could 
result in duplicate catalog creation or missing catalogs when accessed 
concurrently. Use putIfAbsent() or synchronize the entire block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to