Copilot commented on code in PR #1799:
URL: https://github.com/apache/fluss/pull/1799#discussion_r2427708935
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,116 @@
package org.apache.fluss.flink.lake;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.utils.MapUtils;
+
+import org.apache.flink.table.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkFileIOLoader;
import org.apache.paimon.options.Options;
+import java.lang.reflect.Method;
import java.util.Map;
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+
/** A lake catalog to delegate the operations on lake table. */
public class LakeCatalog {
+ private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE =
+ MapUtils.newConcurrentHashMap();
+
+ private final String catalogName;
+ private final ClassLoader classLoader;
+
+ public LakeCatalog(String catalogName, ClassLoader classLoader) {
+ this.catalogName = catalogName;
+ this.classLoader = classLoader;
+ }
+
+ public Catalog getLakeCatalog(Configuration tableOptions) {
+ DataLakeFormat lakeFormat =
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+ // TODO: Currently, a Fluss cluster only supports a single DataLake
storage.
+ // However, in the
+ // future, it may support multiple DataLakes. The following code
assumes
+ // that a single
+ // lakeCatalog is shared across multiple tables, which will no longer
be
+ // valid in such
+ // cases and should be updated accordingly.
+ return LAKE_CATALOG_CACHE.computeIfAbsent(
+ lakeFormat,
+ (dataLakeFormat) -> {
+ if (dataLakeFormat == PAIMON) {
+ return PaimonCatalogFactory.create(catalogName,
tableOptions, classLoader);
+ } else if (dataLakeFormat == ICEBERG) {
+ return IcebergCatalogFactory.create(catalogName,
tableOptions);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported datalake format: " +
dataLakeFormat);
+ }
+ });
+ }
+
+ /**
+ * Factory for creating Paimon Catalog instances.
+ *
+ * <p>Purpose: Encapsulates Paimon-related dependencies (e.g.
FlinkFileIOLoader) to avoid direct
+ * dependency in the main LakeCatalog class.
+ */
+ public static class PaimonCatalogFactory {
+
+ private PaimonCatalogFactory() {}
- // currently, only support paimon
- // todo make it pluggable
- private final FlinkCatalog paimonFlinkCatalog;
-
- public LakeCatalog(
- String catalogName, Map<String, String> catalogProperties,
ClassLoader classLoader) {
- CatalogContext catalogContext =
- CatalogContext.create(
- Options.fromMap(catalogProperties), null, new
FlinkFileIOLoader());
- paimonFlinkCatalog =
- FlinkCatalogFactory.createCatalog(catalogName, catalogContext,
classLoader);
+ public static Catalog create(
+ String catalogName, Configuration tableOptions, ClassLoader
classLoader) {
+ Map<String, String> catalogProperties =
+ DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+ return FlinkCatalogFactory.createCatalog(
+ catalogName,
+ CatalogContext.create(
+ Options.fromMap(catalogProperties), null, new
FlinkFileIOLoader()),
+ classLoader);
+ }
}
- public CatalogBaseTable getTable(ObjectPath objectPath)
- throws TableNotExistException, CatalogException {
- return paimonFlinkCatalog.getTable(objectPath);
+ /** Factory use reflection to create Iceberg Catalog instances. */
+ public static class IcebergCatalogFactory {
+
+ private IcebergCatalogFactory() {}
+
+ // Iceberg 1.4.3 is the last Java 8 compatible version, while the
Iceberg Flink connector
+ // version 1.18+ requires Iceberg 1.5.0+.
+ // Using reflection to maintain Java 8 compatibility.
+ // Once Fluss drops Java 8, we can remove the reflection code
+ public static Catalog create(String catalogName, Configuration
tableOptions) {
+ Map<String, String> catalogProperties =
+ DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+ // Map "type" to "catalog-type" (equivalent)
+ // Required: either "catalog-type" (standard type) or
"catalog-impl"
+ // (fully-qualified custom class, mandatory if "catalog-type" is
missing)
+ if (catalogProperties.containsKey("type")) {
+ catalogProperties.put("catalog-type",
catalogProperties.get("type"));
+ }
+ try {
+ Class<?> flinkCatalogFactoryClass =
+
Class.forName("org.apache.iceberg.flink.FlinkCatalogFactory");
+ Object factoryInstance =
+
flinkCatalogFactoryClass.getDeclaredConstructor().newInstance();
+
+ Method createCatalogMethod =
+ flinkCatalogFactoryClass.getMethod(
+ "createCatalog", String.class, Map.class);
+ return (Catalog)
+ createCatalogMethod.invoke(factoryInstance,
catalogName, catalogProperties);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to create Iceberg catalog using reflection.
Please make sure iceberg-flink-runtime is on the classpath.",
Review Comment:
The error message mentions 'iceberg-flink-runtime' but based on the pom.xml
changes, the actual dependency is 'iceberg-flink-${flink.major.version}'.
Consider updating the error message to match the actual artifact name or use a
more generic description.
```suggestion
"Failed to create Iceberg catalog using reflection.
Please make sure the appropriate Iceberg Flink connector is on the classpath.",
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java:
##########
@@ -17,36 +17,116 @@
package org.apache.fluss.flink.lake;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.utils.MapUtils;
+
+import org.apache.flink.table.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkFileIOLoader;
import org.apache.paimon.options.Options;
+import java.lang.reflect.Method;
import java.util.Map;
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+
/** A lake catalog to delegate the operations on lake table. */
public class LakeCatalog {
+ private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE =
+ MapUtils.newConcurrentHashMap();
+
+ private final String catalogName;
+ private final ClassLoader classLoader;
+
+ public LakeCatalog(String catalogName, ClassLoader classLoader) {
+ this.catalogName = catalogName;
+ this.classLoader = classLoader;
+ }
+
+ public Catalog getLakeCatalog(Configuration tableOptions) {
+ DataLakeFormat lakeFormat =
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+ // TODO: Currently, a Fluss cluster only supports a single DataLake
storage.
+ // However, in the
+ // future, it may support multiple DataLakes. The following code
assumes
+ // that a single
+ // lakeCatalog is shared across multiple tables, which will no longer
be
+ // valid in such
+ // cases and should be updated accordingly.
+ return LAKE_CATALOG_CACHE.computeIfAbsent(
+ lakeFormat,
+ (dataLakeFormat) -> {
+ if (dataLakeFormat == PAIMON) {
+ return PaimonCatalogFactory.create(catalogName,
tableOptions, classLoader);
+ } else if (dataLakeFormat == ICEBERG) {
+ return IcebergCatalogFactory.create(catalogName,
tableOptions);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported datalake format: " +
dataLakeFormat);
+ }
+ });
+ }
+
+ /**
+ * Factory for creating Paimon Catalog instances.
+ *
+ * <p>Purpose: Encapsulates Paimon-related dependencies (e.g.
FlinkFileIOLoader) to avoid direct
+ * dependency in the main LakeCatalog class.
+ */
+ public static class PaimonCatalogFactory {
+
+ private PaimonCatalogFactory() {}
- // currently, only support paimon
- // todo make it pluggable
- private final FlinkCatalog paimonFlinkCatalog;
-
- public LakeCatalog(
- String catalogName, Map<String, String> catalogProperties,
ClassLoader classLoader) {
- CatalogContext catalogContext =
- CatalogContext.create(
- Options.fromMap(catalogProperties), null, new
FlinkFileIOLoader());
- paimonFlinkCatalog =
- FlinkCatalogFactory.createCatalog(catalogName, catalogContext,
classLoader);
+ public static Catalog create(
+ String catalogName, Configuration tableOptions, ClassLoader
classLoader) {
+ Map<String, String> catalogProperties =
+ DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+ return FlinkCatalogFactory.createCatalog(
+ catalogName,
+ CatalogContext.create(
+ Options.fromMap(catalogProperties), null, new
FlinkFileIOLoader()),
+ classLoader);
+ }
}
- public CatalogBaseTable getTable(ObjectPath objectPath)
- throws TableNotExistException, CatalogException {
- return paimonFlinkCatalog.getTable(objectPath);
+ /** Factory use reflection to create Iceberg Catalog instances. */
+ public static class IcebergCatalogFactory {
+
+ private IcebergCatalogFactory() {}
+
+ // Iceberg 1.4.3 is the last Java 8 compatible version, while the
Iceberg Flink connector
+ // version 1.18+ requires Iceberg 1.5.0+.
Review Comment:
The comment mentions Iceberg version 1.18+ but this appears to be referring
to Flink version. Consider clarifying that this refers to 'Flink 1.18+
connector' to avoid confusion with Iceberg versions.
```suggestion
// Iceberg 1.4.3 is the last Java 8 compatible version, while the
Flink 1.18+ connector
// requires Iceberg 1.5.0+.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]