This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 968d3a4df [flink] Supports get iceberg lake table (#1799)
968d3a4df is described below
commit 968d3a4df84b36ceb75b164e987f8d010e4f5c8b
Author: Junbo Wang <[email protected]>
AuthorDate: Tue Oct 14 10:40:28 2025 +0800
[flink] Supports get iceberg lake table (#1799)
---------
Co-authored-by: luoyuxia <[email protected]>
---
.../apache/fluss/flink/catalog/FlinkCatalog.java | 34 ++----
.../org/apache/fluss/flink/lake/LakeCatalog.java | 118 +++++++++++++++++----
fluss-lake/fluss-lake-iceberg/pom.xml | 7 +-
.../lake/iceberg/flink/FlinkCatalogLakeTest.java | 108 +++++++++++++++++++
4 files changed, 219 insertions(+), 48 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index eaed50b1d..6fa779387 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -22,12 +22,10 @@ import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.flink.lake.LakeCatalog;
import org.apache.fluss.flink.procedure.ProcedureManager;
import org.apache.fluss.flink.utils.CatalogExceptionUtils;
-import org.apache.fluss.flink.utils.DataLakeUtils;
import org.apache.fluss.flink.utils.FlinkConversions;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.PartitionInfo;
@@ -74,8 +72,6 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.procedures.Procedure;
-import javax.annotation.Nullable;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -118,9 +114,9 @@ public class FlinkCatalog extends AbstractCatalog {
protected final String defaultDatabase;
protected final String bootstrapServers;
protected final Map<String, String> securityConfigs;
+ private final LakeCatalog lakeCatalog;
protected Connection connection;
protected Admin admin;
- private volatile @Nullable LakeCatalog lakeCatalog;
public FlinkCatalog(
String name,
@@ -134,6 +130,7 @@ public class FlinkCatalog extends AbstractCatalog {
this.bootstrapServers = bootstrapServers;
this.classLoader = classLoader;
this.securityConfigs = securityConfigs;
+ this.lakeCatalog = new LakeCatalog(catalogName, classLoader);
}
@Override
@@ -334,16 +331,17 @@ public class FlinkCatalog extends AbstractCatalog {
protected CatalogBaseTable getLakeTable(
String databaseName, String tableName, Configuration properties)
throws TableNotExistException, CatalogException {
- mayInitLakeCatalogCatalog(properties);
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
if (tableComponents.length == 1) {
// should be pattern like table_name$lake
tableName = tableComponents[0];
} else {
- // be some thing like table_name$lake$snapshot
+ // be something like table_name$lake$snapshot
tableName = String.join("", tableComponents);
}
- return lakeCatalog.getTable(new ObjectPath(databaseName, tableName));
+ return lakeCatalog
+ .getLakeCatalog(properties)
+ .getTable(new ObjectPath(databaseName, tableName));
}
@Override
@@ -751,26 +749,6 @@ public class FlinkCatalog extends AbstractCatalog {
}
}
- private void mayInitLakeCatalogCatalog(Configuration tableOptions) {
- // TODO: Currently, a Fluss cluster only supports a single DataLake
storage. However, in the
- // future, it may support multiple DataLakes. The following code
assumes that a single
- // lakeCatalog is shared across multiple tables, which will no longer
be valid in such
- // cases and should be updated accordingly.
- if (lakeCatalog == null) {
- synchronized (this) {
- if (lakeCatalog == null) {
- try {
- Map<String, String> catalogProperties =
-
DataLakeUtils.extractLakeCatalogProperties(tableOptions);
- lakeCatalog = new LakeCatalog(catalogName,
catalogProperties, classLoader);
- } catch (Exception e) {
- throw new FlussRuntimeException("Failed to init paimon
catalog.", e);
- }
- }
- }
- }
- }
-
@VisibleForTesting
public Map<String, String> getSecurityConfigs() {
return securityConfigs;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
index 447616106..42f5e7008 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
@@ -17,36 +17,116 @@
package org.apache.fluss.flink.lake;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.utils.MapUtils;
+
+import org.apache.flink.table.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkFileIOLoader;
import org.apache.paimon.options.Options;
+import java.lang.reflect.Method;
import java.util.Map;
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+
/** A lake catalog to delegate the operations on lake table. */
public class LakeCatalog {
+ private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE =
+ MapUtils.newConcurrentHashMap();
+
+ private final String catalogName;
+ private final ClassLoader classLoader;
+
+ public LakeCatalog(String catalogName, ClassLoader classLoader) {
+ this.catalogName = catalogName;
+ this.classLoader = classLoader;
+ }
+
+ public Catalog getLakeCatalog(Configuration tableOptions) {
+ DataLakeFormat lakeFormat =
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+ // TODO: Currently, a Fluss cluster only supports a single DataLake
storage.
+ // However, in the
+ // future, it may support multiple DataLakes. The following code
assumes
+ // that a single
+ // lakeCatalog is shared across multiple tables, which will no longer
be
+ // valid in such
+ // cases and should be updated accordingly.
+ return LAKE_CATALOG_CACHE.computeIfAbsent(
+ lakeFormat,
+ (dataLakeFormat) -> {
+ if (dataLakeFormat == PAIMON) {
+ return PaimonCatalogFactory.create(catalogName,
tableOptions, classLoader);
+ } else if (dataLakeFormat == ICEBERG) {
+ return IcebergCatalogFactory.create(catalogName,
tableOptions);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported datalake format: " +
dataLakeFormat);
+ }
+ });
+ }
+
+ /**
+ * Factory for creating Paimon Catalog instances.
+ *
+ * <p>Purpose: Encapsulates Paimon-related dependencies (e.g.
FlinkFileIOLoader) to avoid direct
+ * dependency in the main LakeCatalog class.
+ */
+ public static class PaimonCatalogFactory {
+
+ private PaimonCatalogFactory() {}
- // currently, only support paimon
- // todo make it pluggable
- private final FlinkCatalog paimonFlinkCatalog;
-
- public LakeCatalog(
- String catalogName, Map<String, String> catalogProperties,
ClassLoader classLoader) {
- CatalogContext catalogContext =
- CatalogContext.create(
- Options.fromMap(catalogProperties), null, new
FlinkFileIOLoader());
- paimonFlinkCatalog =
- FlinkCatalogFactory.createCatalog(catalogName, catalogContext,
classLoader);
+ public static Catalog create(
+ String catalogName, Configuration tableOptions, ClassLoader
classLoader) {
+ Map<String, String> catalogProperties =
+ DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+ return FlinkCatalogFactory.createCatalog(
+ catalogName,
+ CatalogContext.create(
+ Options.fromMap(catalogProperties), null, new
FlinkFileIOLoader()),
+ classLoader);
+ }
}
- public CatalogBaseTable getTable(ObjectPath objectPath)
- throws TableNotExistException, CatalogException {
- return paimonFlinkCatalog.getTable(objectPath);
+ /** Factory use reflection to create Iceberg Catalog instances. */
+ public static class IcebergCatalogFactory {
+
+ private IcebergCatalogFactory() {}
+
+ // Iceberg 1.4.3 is the last Java 8 compatible version, while the
Flink 1.18+ connector
+ // requires Iceberg 1.5.0+.
+ // Using reflection to maintain Java 8 compatibility.
+ // Once Fluss drops Java 8, we can remove the reflection code
+ public static Catalog create(String catalogName, Configuration
tableOptions) {
+ Map<String, String> catalogProperties =
+ DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+ // Map "type" to "catalog-type" (equivalent)
+ // Required: either "catalog-type" (standard type) or
"catalog-impl"
+ // (fully-qualified custom class, mandatory if "catalog-type" is
missing)
+ if (catalogProperties.containsKey("type")) {
+ catalogProperties.put("catalog-type",
catalogProperties.get("type"));
+ }
+ try {
+ Class<?> flinkCatalogFactoryClass =
+
Class.forName("org.apache.iceberg.flink.FlinkCatalogFactory");
+ Object factoryInstance =
+
flinkCatalogFactoryClass.getDeclaredConstructor().newInstance();
+
+ Method createCatalogMethod =
+ flinkCatalogFactoryClass.getMethod(
+ "createCatalog", String.class, Map.class);
+ return (Catalog)
+ createCatalogMethod.invoke(factoryInstance,
catalogName, catalogProperties);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to create Iceberg catalog using reflection.
Please make sure iceberg-flink-runtime is on the classpath.",
+ e);
+ }
+ }
}
}
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml
b/fluss-lake/fluss-lake-iceberg/pom.xml
index de4f5ab0e..f1d195029 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -236,7 +236,12 @@
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-flink-${flink.major.version}</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
new file mode 100644
index 000000000..f1305b99d
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.flink;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.flink.catalog.FlinkCatalog;
+import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link FlinkCatalog}. */
+class FlinkCatalogLakeTest extends FlinkIcebergTieringTestBase {
+
+ protected static final String DEFAULT_DB = "fluss";
+
+ protected static final String CATALOG_NAME = "test_iceberg_lake";
+
+ FlinkCatalog catalog;
+
+ @BeforeEach
+ public void beforeEach() {
+ super.beforeEach();
+ buildCatalog();
+ }
+
+ @Test
+ // TODO: remove this test in #1803
+ void testGetLakeTable() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(TABLE_DATALAKE_ENABLED.key(), "true");
+ ObjectPath lakeTablePath = new ObjectPath(DEFAULT_DB, "lake_table");
+ CatalogTable table = this.newCatalogTable(options);
+ catalog.createTable(lakeTablePath, table, false);
+ assertThat(catalog.tableExists(lakeTablePath)).isTrue();
+ CatalogBaseTable lakeTable =
+ catalog.getTable(new ObjectPath(DEFAULT_DB,
"lake_table$lake"));
+ Schema schema = lakeTable.getUnresolvedSchema();
+ assertThat(schema.getColumns().size()).isEqualTo(3 +
SYSTEM_COLUMNS.size());
+ assertThat(schema.getPrimaryKey().isPresent()).isTrue();
+
assertThat(schema.getPrimaryKey().get().getColumnNames()).isEqualTo(List.of("first"));
+ }
+
+ private CatalogTable newCatalogTable(Map<String, String> options) {
+ ResolvedSchema resolvedSchema =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("first",
DataTypes.STRING().notNull()),
+ Column.physical("second", DataTypes.INT()),
+ Column.physical("third",
DataTypes.STRING().notNull())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("PK_first",
List.of("first")));
+ CatalogTable origin =
+ CatalogTable.of(
+
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+ "test comment",
+ Collections.emptyList(),
+ options);
+ return new ResolvedCatalogTable(origin, resolvedSchema);
+ }
+
+ public void buildCatalog() {
+ String bootstrapServers = String.join(",",
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+ catalog =
+ new FlinkCatalog(
+ CATALOG_NAME,
+ DEFAULT_DB,
+ bootstrapServers,
+ Thread.currentThread().getContextClassLoader(),
+ Collections.emptyMap());
+ catalog.open();
+ }
+}