Copilot commented on code in PR #1799:
URL: https://github.com/apache/fluss/pull/1799#discussion_r2426153082
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,94 @@
package org.apache.fluss.flink.lake;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+
+import org.apache.flink.table.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkFileIOLoader;
import org.apache.paimon.options.Options;
+import java.lang.reflect.Method;
+import java.util.HashMap;
import java.util.Map;
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+
/** A lake catalog to delegate the operations on lake table. */
public class LakeCatalog {
+ private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE = new
HashMap<>();
Review Comment:
The static HashMap is not thread-safe and could lead to race conditions when
multiple threads access getLakeCatalog() simultaneously. Consider using
ConcurrentHashMap or adding synchronization.
```suggestion
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
/** A lake catalog to delegate the operations on lake table. */
public class LakeCatalog {
private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE =
new ConcurrentHashMap<>();
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,94 @@
package org.apache.fluss.flink.lake;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+
+import org.apache.flink.table.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkFileIOLoader;
import org.apache.paimon.options.Options;
+import java.lang.reflect.Method;
+import java.util.HashMap;
import java.util.Map;
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+
/** A lake catalog to delegate the operations on lake table. */
public class LakeCatalog {
+ private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE = new
HashMap<>();
+
+ private final String catalogName;
+ private final ClassLoader classLoader;
+
+ public LakeCatalog(String catalogName, ClassLoader classLoader) {
+ this.catalogName = catalogName;
+ this.classLoader = classLoader;
+ }
+
+ public Catalog getLakeCatalog(Configuration tableOptions) {
+ DataLakeFormat lakeFormat =
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+ Map<String, String> catalogProperties =
+ DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+ if (lakeFormat == PAIMON && !LAKE_CATALOG_CACHE.containsKey(PAIMON)) {
+ LAKE_CATALOG_CACHE.put(
+ PAIMON,
+ PaimonCatalogFactory.create(catalogName,
catalogProperties, classLoader));
+ } else if (lakeFormat == ICEBERG &&
!LAKE_CATALOG_CACHE.containsKey(ICEBERG)) {
+ LAKE_CATALOG_CACHE.put(
+ ICEBERG, IcebergCatalogFactory.create(catalogName,
catalogProperties));
Review Comment:
The else clause will be executed for supported formats that are already in
the cache, throwing an exception incorrectly. The logic should only throw for
truly unsupported formats.
```suggestion
if (lakeFormat == PAIMON) {
if (!LAKE_CATALOG_CACHE.containsKey(PAIMON)) {
LAKE_CATALOG_CACHE.put(
PAIMON,
PaimonCatalogFactory.create(catalogName,
catalogProperties, classLoader));
}
} else if (lakeFormat == ICEBERG) {
if (!LAKE_CATALOG_CACHE.containsKey(ICEBERG)) {
LAKE_CATALOG_CACHE.put(
ICEBERG, IcebergCatalogFactory.create(catalogName,
catalogProperties));
}
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,94 @@
package org.apache.fluss.flink.lake;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+
+import org.apache.flink.table.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkFileIOLoader;
import org.apache.paimon.options.Options;
+import java.lang.reflect.Method;
+import java.util.HashMap;
import java.util.Map;
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+
/** A lake catalog to delegate the operations on lake table. */
public class LakeCatalog {
+ private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE = new
HashMap<>();
+
+ private final String catalogName;
+ private final ClassLoader classLoader;
+
+ public LakeCatalog(String catalogName, ClassLoader classLoader) {
+ this.catalogName = catalogName;
+ this.classLoader = classLoader;
+ }
+
+ public Catalog getLakeCatalog(Configuration tableOptions) {
+ DataLakeFormat lakeFormat =
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+ Map<String, String> catalogProperties =
+ DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+ if (lakeFormat == PAIMON && !LAKE_CATALOG_CACHE.containsKey(PAIMON)) {
+ LAKE_CATALOG_CACHE.put(
+ PAIMON,
+ PaimonCatalogFactory.create(catalogName,
catalogProperties, classLoader));
+ } else if (lakeFormat == ICEBERG &&
!LAKE_CATALOG_CACHE.containsKey(ICEBERG)) {
+ LAKE_CATALOG_CACHE.put(
+ ICEBERG, IcebergCatalogFactory.create(catalogName,
catalogProperties));
+ } else {
+ throw new UnsupportedOperationException("Unsupported datalake
format: " + lakeFormat);
+ }
Review Comment:
The check-then-act operations on LAKE_CATALOG_CACHE are not atomic and could
result in duplicate catalog creation or missing catalogs when accessed
concurrently. Use putIfAbsent() or synchronize the entire block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]