This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 184eb8a16 [flink] Support $lake table for iceberg (#1812)
184eb8a16 is described below

commit 184eb8a1667c5925d5f141e2c02441671a4804d1
Author: MehulBatra <[email protected]>
AuthorDate: Wed Oct 15 17:03:40 2025 +0530

    [flink] Support $lake table for iceberg (#1812)
    
    ---------
    
    Co-authored-by: Mehul Batra <[email protected]>
---
 .../apache/fluss/flink/catalog/FlinkCatalog.java   |  5 +-
 .../fluss/flink/catalog/FlinkTableFactory.java     | 15 +++-
 .../org/apache/fluss/flink/lake/LakeCatalog.java   | 32 +++++++--
 .../apache/fluss/flink/lake/LakeTableFactory.java  | 84 +++++++++++++++++++---
 fluss-lake/fluss-lake-iceberg/pom.xml              |  6 ++
 .../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 73 +++++++++++++++++++
 6 files changed, 193 insertions(+), 22 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 6fa779387..3ecb01541 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -135,7 +135,7 @@ public class FlinkCatalog extends AbstractCatalog {
 
     @Override
     public Optional<Factory> getFactory() {
-        return Optional.of(new FlinkTableFactory());
+        return Optional.of(new FlinkTableFactory(lakeCatalog));
     }
 
     @Override
@@ -336,7 +336,8 @@ public class FlinkCatalog extends AbstractCatalog {
             // should be pattern like table_name$lake
             tableName = tableComponents[0];
         } else {
-            // be something like table_name$lake$snapshot
+            // pattern is table_name$lake$snapshots
+            // Need to reconstruct: table_name + $snapshots
             tableName = String.join("", tableComponents);
         }
         return lakeCatalog
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index 9ca8efae7..0b0a59981 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.catalog;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.FlinkConnectorOptions;
+import org.apache.fluss.flink.lake.LakeCatalog;
 import org.apache.fluss.flink.lake.LakeTableFactory;
 import org.apache.fluss.flink.sink.FlinkTableSink;
 import org.apache.fluss.flink.source.FlinkTableSource;
@@ -68,17 +69,25 @@ import static 
org.apache.fluss.flink.utils.FlinkConversions.toFlinkOption;
 /** Factory to create table source and table sink for Fluss. */
 public class FlinkTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
 
+    private final LakeCatalog lakeCatalog;
     private volatile LakeTableFactory lakeTableFactory;
 
+    public FlinkTableFactory(LakeCatalog lakeCatalog) {
+        this.lakeCatalog = lakeCatalog;
+    }
+
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         // check whether should read from datalake
         ObjectIdentifier tableIdentifier = context.getObjectIdentifier();
         String tableName = tableIdentifier.getObjectName();
         if (tableName.contains(LAKE_TABLE_SPLITTER)) {
-            tableName = tableName.substring(0, 
tableName.indexOf(LAKE_TABLE_SPLITTER));
+            // Extract the lake table name: for "table$lake" -> "table"
+            // for "table$lake$snapshots" -> "table$snapshots"
+            String lakeTableName = tableName.replaceFirst("\\$lake", "");
+
             lakeTableFactory = mayInitLakeTableFactory();
-            return lakeTableFactory.createDynamicTableSource(context, 
tableName);
+            return lakeTableFactory.createDynamicTableSource(context, 
lakeTableName);
         }
 
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
@@ -248,7 +257,7 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
         if (lakeTableFactory == null) {
             synchronized (this) {
                 if (lakeTableFactory == null) {
-                    lakeTableFactory = new LakeTableFactory();
+                    lakeTableFactory = new LakeTableFactory(lakeCatalog);
                 }
             }
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
index 42f5e7008..e1f8096e6 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
@@ -50,13 +50,31 @@ public class LakeCatalog {
 
     public Catalog getLakeCatalog(Configuration tableOptions) {
         DataLakeFormat lakeFormat = 
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
-        // TODO: Currently, a Fluss cluster only supports a single DataLake 
storage.
-        // However, in the
-        //  future, it may support multiple DataLakes. The following code 
assumes
-        // that a single
-        //  lakeCatalog is shared across multiple tables, which will no longer 
be
-        // valid in such
-        //  cases and should be updated accordingly.
+        if (lakeFormat == null) {
+            throw new IllegalArgumentException(
+                    "DataLake format is not specified in table options. "
+                            + "Please ensure '"
+                            + ConfigOptions.TABLE_DATALAKE_FORMAT.key()
+                            + "' is set.");
+        }
+        return LAKE_CATALOG_CACHE.computeIfAbsent(
+                lakeFormat,
+                (dataLakeFormat) -> {
+                    if (dataLakeFormat == PAIMON) {
+                        return PaimonCatalogFactory.create(catalogName, 
tableOptions, classLoader);
+                    } else if (dataLakeFormat == ICEBERG) {
+                        return IcebergCatalogFactory.create(catalogName, 
tableOptions);
+                    } else {
+                        throw new UnsupportedOperationException(
+                                "Unsupported datalake format: " + 
dataLakeFormat);
+                    }
+                });
+    }
+
+    public Catalog getLakeCatalog(Configuration tableOptions, DataLakeFormat 
lakeFormat) {
+        if (lakeFormat == null) {
+            throw new IllegalArgumentException("DataLake format cannot be 
null");
+        }
         return LAKE_CATALOG_CACHE.computeIfAbsent(
                 lakeFormat,
                 (dataLakeFormat) -> {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
index 975d49e95..93120e9e2 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
@@ -20,37 +20,101 @@ package org.apache.fluss.flink.lake;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.paimon.flink.FlinkTableFactory;
+
+import java.util.Map;
 
 /** A factory to create {@link DynamicTableSource} for lake table. */
 public class LakeTableFactory {
+    private final LakeCatalog lakeCatalog;
 
-    // now, always assume is paimon, todo need to describe lake storage from
-    // to know which lake storage used
-    private final org.apache.paimon.flink.FlinkTableFactory 
paimonFlinkTableFactory;
-
-    public LakeTableFactory() {
-        paimonFlinkTableFactory = new FlinkTableFactory();
+    public LakeTableFactory(LakeCatalog lakeCatalog) {
+        this.lakeCatalog = lakeCatalog;
     }
 
     public DynamicTableSource createDynamicTableSource(
             DynamicTableFactory.Context context, String tableName) {
         ObjectIdentifier originIdentifier = context.getObjectIdentifier();
-        ObjectIdentifier paimonIdentifier =
+        ObjectIdentifier lakeIdentifier =
                 ObjectIdentifier.of(
                         originIdentifier.getCatalogName(),
                         originIdentifier.getDatabaseName(),
                         tableName);
+
+        // Determine the lake format from the table options
+        Map<String, String> tableOptions = 
context.getCatalogTable().getOptions();
+
+        // If not present, fallback to 'fluss.table.datalake.format' (set by 
Fluss)
+        String connector = tableOptions.get("connector");
+        if (connector == null) {
+            connector = tableOptions.get("fluss.table.datalake.format");
+        }
+
+        if (connector == null) {
+            // For Paimon system tables (like table_name$options), the table 
options are empty
+            // Default to Paimon for backward compatibility
+            connector = "paimon";
+        }
+
+        // For Iceberg and Paimon, pass the table name as-is to their factory.
+        // Metadata tables will be handled internally by their respective 
factories.
         DynamicTableFactory.Context newContext =
                 new FactoryUtil.DefaultDynamicTableContext(
-                        paimonIdentifier,
+                        lakeIdentifier,
                         context.getCatalogTable(),
                         context.getEnrichmentOptions(),
                         context.getConfiguration(),
                         context.getClassLoader(),
                         context.isTemporary());
 
-        return paimonFlinkTableFactory.createDynamicTableSource(newContext);
+        // Get the appropriate factory based on connector type
+        DynamicTableSourceFactory factory = getLakeTableFactory(connector, 
tableOptions);
+        return factory.createDynamicTableSource(newContext);
+    }
+
+    private DynamicTableSourceFactory getLakeTableFactory(
+            String connector, Map<String, String> tableOptions) {
+        if ("paimon".equalsIgnoreCase(connector)) {
+            return getPaimonFactory();
+        } else if ("iceberg".equalsIgnoreCase(connector)) {
+            return getIcebergFactory(tableOptions);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported lake connector: "
+                            + connector
+                            + ". Only 'paimon' and 'iceberg' are supported.");
+        }
+    }
+
+    private DynamicTableSourceFactory getPaimonFactory() {
+        return new org.apache.paimon.flink.FlinkTableFactory();
+    }
+
+    private DynamicTableSourceFactory getIcebergFactory(Map<String, String> 
tableOptions) {
+        try {
+            // Get the Iceberg FlinkCatalog instance from LakeCatalog
+            org.apache.fluss.config.Configuration flussConfig =
+                    
org.apache.fluss.config.Configuration.fromMap(tableOptions);
+
+            // Get catalog with explicit ICEBERG format
+            org.apache.flink.table.catalog.Catalog catalog =
+                    lakeCatalog.getLakeCatalog(
+                            flussConfig, 
org.apache.fluss.metadata.DataLakeFormat.ICEBERG);
+
+            // Create FlinkDynamicTableFactory with the catalog
+            Class<?> icebergFactoryClass =
+                    
Class.forName("org.apache.iceberg.flink.FlinkDynamicTableFactory");
+            Class<?> flinkCatalogClass = 
Class.forName("org.apache.iceberg.flink.FlinkCatalog");
+
+            return (DynamicTableSourceFactory)
+                    icebergFactoryClass
+                            .getDeclaredConstructor(flinkCatalogClass)
+                            .newInstance(catalog);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to create Iceberg table factory. Please ensure 
iceberg-flink-runtime is on the classpath.",
+                    e);
+        }
     }
 }
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml 
b/fluss-lake/fluss-lake-iceberg/pom.xml
index f1d195029..94bca76b1 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -242,6 +242,12 @@
             <version>${iceberg.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 603947f5b..0ff38190c 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -32,9 +32,11 @@ import org.apache.fluss.server.replica.Replica;
 import org.apache.fluss.types.DataTypes;
 
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -51,6 +53,7 @@ import java.util.Map;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
 import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test case for union read primary key table. */
 public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
@@ -273,6 +276,76 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws 
Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "lake_pk_table_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        // create table & write initial data
+        long tableId =
+                preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
bucketLogEndOffset);
+
+        // wait until records have been synced to Iceberg
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // Test 1: Read Iceberg lake table directly using $lake suffix
+        TableResult lakeTableResult =
+                batchTEnv.executeSql(String.format("select * from %s$lake", 
tableName));
+        List<Row> icebergRows = 
CollectionUtil.iteratorToList(lakeTableResult.collect());
+
+        // Verify that we can read data from Iceberg via $lake suffix
+        assertThat(icebergRows).isNotEmpty();
+
+        // Note: The expected row count should be based on how many rows were 
written
+        // In preparePKTableFullType, we write 2 unique rows (by PK) per 
iteration, 2 iterations
+        // Since this is a primary key table, duplicate PKs are deduplicated, 
so only 2 unique rows
+        // per partition
+        int expectedUserRowCount = isPartitioned ? 2 * 
waitUntilPartitions(t1).size() : 2;
+        assertThat(icebergRows).hasSize(expectedUserRowCount);
+
+        // verify rows have expected number of columns
+        int userColumnCount = 
lakeTableResult.getResolvedSchema().getColumnCount();
+        Row firstRow = icebergRows.get(0);
+        assertThat(firstRow.getArity())
+                .as("Iceberg row should have at least user columns")
+                .isGreaterThanOrEqualTo(userColumnCount);
+
+        // Test 2: Read Iceberg system table (snapshots) using $lake$snapshots 
suffix
+        TableResult snapshotsResult =
+                batchTEnv.executeSql(String.format("select * from 
%s$lake$snapshots", tableName));
+        List<Row> snapshotRows = 
CollectionUtil.iteratorToList(snapshotsResult.collect());
+
+        // Verify that we can read snapshots from Iceberg via $lake$snapshots 
suffix
+        assertThat(snapshotRows).as("Should have at least one 
snapshot").isNotEmpty();
+
+        // Verify snapshot structure based on Iceberg snapshots table schema
+        // Expected columns: committed_at, snapshot_id, parent_id, operation, 
manifest_list, summary
+        Row firstSnapshot = snapshotRows.get(0);
+        assertThat(firstSnapshot.getArity()).as("Snapshot row should have 6 
columns").isEqualTo(6);
+
+        // Verify committed_at field (index 0) is not null
+        assertThat(firstSnapshot.getField(0)).as("committed_at should not be 
null").isNotNull();
+
+        // Verify snapshot_id field (index 1) is not null
+        assertThat(firstSnapshot.getField(1)).as("snapshot_id should not be 
null").isNotNull();
+
+        // Verify manifest_list field (index 4) is not null and is a string 
path
+        assertThat(firstSnapshot.getField(4))
+                .as("manifest_list should be a non-null path")
+                .isNotNull()
+                .isInstanceOf(String.class);
+
+        // Verify summary field (index 5) contains expected metadata
+        assertThat(firstSnapshot.getField(5)).as("summary should not be 
null").isNotNull();
+
+        jobClient.cancel().get();
+    }
+
     private void writeFullTypeRow(TablePath tablePath, String partition) 
throws Exception {
         List<InternalRow> rows =
                 Collections.singletonList(

Reply via email to