This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 968d3a4df [flink] Supports get iceberg lake table (#1799)
968d3a4df is described below

commit 968d3a4df84b36ceb75b164e987f8d010e4f5c8b
Author: Junbo Wang <[email protected]>
AuthorDate: Tue Oct 14 10:40:28 2025 +0800

    [flink] Supports get iceberg lake table (#1799)
    
    ---------
    
    Co-authored-by: luoyuxia <[email protected]>
---
 .../apache/fluss/flink/catalog/FlinkCatalog.java   |  34 ++----
 .../org/apache/fluss/flink/lake/LakeCatalog.java   | 118 +++++++++++++++++----
 fluss-lake/fluss-lake-iceberg/pom.xml              |   7 +-
 .../lake/iceberg/flink/FlinkCatalogLakeTest.java   | 108 +++++++++++++++++++
 4 files changed, 219 insertions(+), 48 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index eaed50b1d..6fa779387 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -22,12 +22,10 @@ import org.apache.fluss.client.ConnectionFactory;
 import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.flink.lake.LakeCatalog;
 import org.apache.fluss.flink.procedure.ProcedureManager;
 import org.apache.fluss.flink.utils.CatalogExceptionUtils;
-import org.apache.fluss.flink.utils.DataLakeUtils;
 import org.apache.fluss.flink.utils.FlinkConversions;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.PartitionInfo;
@@ -74,8 +72,6 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.procedures.Procedure;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -118,9 +114,9 @@ public class FlinkCatalog extends AbstractCatalog {
     protected final String defaultDatabase;
     protected final String bootstrapServers;
     protected final Map<String, String> securityConfigs;
+    private final LakeCatalog lakeCatalog;
     protected Connection connection;
     protected Admin admin;
-    private volatile @Nullable LakeCatalog lakeCatalog;
 
     public FlinkCatalog(
             String name,
@@ -134,6 +130,7 @@ public class FlinkCatalog extends AbstractCatalog {
         this.bootstrapServers = bootstrapServers;
         this.classLoader = classLoader;
         this.securityConfigs = securityConfigs;
+        this.lakeCatalog = new LakeCatalog(catalogName, classLoader);
     }
 
     @Override
@@ -334,16 +331,17 @@ public class FlinkCatalog extends AbstractCatalog {
     protected CatalogBaseTable getLakeTable(
             String databaseName, String tableName, Configuration properties)
             throws TableNotExistException, CatalogException {
-        mayInitLakeCatalogCatalog(properties);
         String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
         if (tableComponents.length == 1) {
             // should be pattern like table_name$lake
             tableName = tableComponents[0];
         } else {
-            // be some thing like table_name$lake$snapshot
+            // be something like table_name$lake$snapshot
             tableName = String.join("", tableComponents);
         }
-        return lakeCatalog.getTable(new ObjectPath(databaseName, tableName));
+        return lakeCatalog
+                .getLakeCatalog(properties)
+                .getTable(new ObjectPath(databaseName, tableName));
     }
 
     @Override
@@ -751,26 +749,6 @@ public class FlinkCatalog extends AbstractCatalog {
         }
     }
 
-    private void mayInitLakeCatalogCatalog(Configuration tableOptions) {
-        // TODO: Currently, a Fluss cluster only supports a single DataLake 
storage. However, in the
-        //  future, it may support multiple DataLakes. The following code 
assumes that a single
-        //  lakeCatalog is shared across multiple tables, which will no longer 
be valid in such
-        //  cases and should be updated accordingly.
-        if (lakeCatalog == null) {
-            synchronized (this) {
-                if (lakeCatalog == null) {
-                    try {
-                        Map<String, String> catalogProperties =
-                                
DataLakeUtils.extractLakeCatalogProperties(tableOptions);
-                        lakeCatalog = new LakeCatalog(catalogName, 
catalogProperties, classLoader);
-                    } catch (Exception e) {
-                        throw new FlussRuntimeException("Failed to init paimon 
catalog.", e);
-                    }
-                }
-            }
-        }
-    }
-
     @VisibleForTesting
     public Map<String, String> getSecurityConfigs() {
         return securityConfigs;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
index 447616106..42f5e7008 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
@@ -17,36 +17,116 @@
 
 package org.apache.fluss.flink.lake;
 
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.utils.DataLakeUtils;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.utils.MapUtils;
+
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.FlinkFileIOLoader;
 import org.apache.paimon.options.Options;
 
+import java.lang.reflect.Method;
 import java.util.Map;
 
+import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+
 /** A lake catalog to delegate the operations on lake table. */
 public class LakeCatalog {
+    private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE =
+            MapUtils.newConcurrentHashMap();
+
+    private final String catalogName;
+    private final ClassLoader classLoader;
+
+    public LakeCatalog(String catalogName, ClassLoader classLoader) {
+        this.catalogName = catalogName;
+        this.classLoader = classLoader;
+    }
+
+    public Catalog getLakeCatalog(Configuration tableOptions) {
+        DataLakeFormat lakeFormat = 
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+        // TODO: Currently, a Fluss cluster only supports a single DataLake 
storage.
+        // However, in the
+        //  future, it may support multiple DataLakes. The following code 
assumes
+        // that a single
+        //  lakeCatalog is shared across multiple tables, which will no longer 
be
+        // valid in such
+        //  cases and should be updated accordingly.
+        return LAKE_CATALOG_CACHE.computeIfAbsent(
+                lakeFormat,
+                (dataLakeFormat) -> {
+                    if (dataLakeFormat == PAIMON) {
+                        return PaimonCatalogFactory.create(catalogName, 
tableOptions, classLoader);
+                    } else if (dataLakeFormat == ICEBERG) {
+                        return IcebergCatalogFactory.create(catalogName, 
tableOptions);
+                    } else {
+                        throw new UnsupportedOperationException(
+                                "Unsupported datalake format: " + 
dataLakeFormat);
+                    }
+                });
+    }
+
+    /**
+     * Factory for creating Paimon Catalog instances.
+     *
+     * <p>Purpose: Encapsulates Paimon-related dependencies (e.g. 
FlinkFileIOLoader) to avoid direct
+     * dependency in the main LakeCatalog class.
+     */
+    public static class PaimonCatalogFactory {
+
+        private PaimonCatalogFactory() {}
 
-    // currently, only support paimon
-    // todo make it pluggable
-    private final FlinkCatalog paimonFlinkCatalog;
-
-    public LakeCatalog(
-            String catalogName, Map<String, String> catalogProperties, 
ClassLoader classLoader) {
-        CatalogContext catalogContext =
-                CatalogContext.create(
-                        Options.fromMap(catalogProperties), null, new 
FlinkFileIOLoader());
-        paimonFlinkCatalog =
-                FlinkCatalogFactory.createCatalog(catalogName, catalogContext, 
classLoader);
+        public static Catalog create(
+                String catalogName, Configuration tableOptions, ClassLoader 
classLoader) {
+            Map<String, String> catalogProperties =
+                    DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+            return FlinkCatalogFactory.createCatalog(
+                    catalogName,
+                    CatalogContext.create(
+                            Options.fromMap(catalogProperties), null, new 
FlinkFileIOLoader()),
+                    classLoader);
+        }
     }
 
-    public CatalogBaseTable getTable(ObjectPath objectPath)
-            throws TableNotExistException, CatalogException {
-        return paimonFlinkCatalog.getTable(objectPath);
+    /** Factory use reflection to create Iceberg Catalog instances. */
+    public static class IcebergCatalogFactory {
+
+        private IcebergCatalogFactory() {}
+
+        // Iceberg 1.4.3 is the last Java 8 compatible version, while the 
Flink 1.18+ connector
+        // requires Iceberg 1.5.0+.
+        // Using reflection to maintain Java 8 compatibility.
+        // Once Fluss drops Java 8, we can remove the reflection code
+        public static Catalog create(String catalogName, Configuration 
tableOptions) {
+            Map<String, String> catalogProperties =
+                    DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+            // Map "type" to "catalog-type" (equivalent)
+            // Required: either "catalog-type" (standard type) or 
"catalog-impl"
+            // (fully-qualified custom class, mandatory if "catalog-type" is 
missing)
+            if (catalogProperties.containsKey("type")) {
+                catalogProperties.put("catalog-type", 
catalogProperties.get("type"));
+            }
+            try {
+                Class<?> flinkCatalogFactoryClass =
+                        
Class.forName("org.apache.iceberg.flink.FlinkCatalogFactory");
+                Object factoryInstance =
+                        
flinkCatalogFactoryClass.getDeclaredConstructor().newInstance();
+
+                Method createCatalogMethod =
+                        flinkCatalogFactoryClass.getMethod(
+                                "createCatalog", String.class, Map.class);
+                return (Catalog)
+                        createCatalogMethod.invoke(factoryInstance, 
catalogName, catalogProperties);
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        "Failed to create Iceberg catalog using reflection. 
Please make sure iceberg-flink-runtime is on the classpath.",
+                        e);
+            }
+        }
     }
 }
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml 
b/fluss-lake/fluss-lake-iceberg/pom.xml
index de4f5ab0e..f1d195029 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -236,7 +236,12 @@
             <scope>test</scope>
         </dependency>
 
-
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-flink-${flink.major.version}</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
new file mode 100644
index 000000000..f1305b99d
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.flink;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.flink.catalog.FlinkCatalog;
+import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link FlinkCatalog}. */
+class FlinkCatalogLakeTest extends FlinkIcebergTieringTestBase {
+
+    protected static final String DEFAULT_DB = "fluss";
+
+    protected static final String CATALOG_NAME = "test_iceberg_lake";
+
+    FlinkCatalog catalog;
+
+    @BeforeEach
+    public void beforeEach() {
+        super.beforeEach();
+        buildCatalog();
+    }
+
+    @Test
+    // TODO: remove this test in #1803
+    void testGetLakeTable() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(TABLE_DATALAKE_ENABLED.key(), "true");
+        ObjectPath lakeTablePath = new ObjectPath(DEFAULT_DB, "lake_table");
+        CatalogTable table = this.newCatalogTable(options);
+        catalog.createTable(lakeTablePath, table, false);
+        assertThat(catalog.tableExists(lakeTablePath)).isTrue();
+        CatalogBaseTable lakeTable =
+                catalog.getTable(new ObjectPath(DEFAULT_DB, 
"lake_table$lake"));
+        Schema schema = lakeTable.getUnresolvedSchema();
+        assertThat(schema.getColumns().size()).isEqualTo(3 + 
SYSTEM_COLUMNS.size());
+        assertThat(schema.getPrimaryKey().isPresent()).isTrue();
+        
assertThat(schema.getPrimaryKey().get().getColumnNames()).isEqualTo(List.of("first"));
+    }
+
+    private CatalogTable newCatalogTable(Map<String, String> options) {
+        ResolvedSchema resolvedSchema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("first", 
DataTypes.STRING().notNull()),
+                                Column.physical("second", DataTypes.INT()),
+                                Column.physical("third", 
DataTypes.STRING().notNull())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey("PK_first", 
List.of("first")));
+        CatalogTable origin =
+                CatalogTable.of(
+                        
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+                        "test comment",
+                        Collections.emptyList(),
+                        options);
+        return new ResolvedCatalogTable(origin, resolvedSchema);
+    }
+
+    public void buildCatalog() {
+        String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+        catalog =
+                new FlinkCatalog(
+                        CATALOG_NAME,
+                        DEFAULT_DB,
+                        bootstrapServers,
+                        Thread.currentThread().getContextClassLoader(),
+                        Collections.emptyMap());
+        catalog.open();
+    }
+}

Reply via email to