This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new b85ba5a26 [flink] Get table info of primary lake table. (#2152)
b85ba5a26 is described below

commit b85ba5a263495a7ba18bb1639be6e353db42dd59
Author: Hongshun Wang <[email protected]>
AuthorDate: Tue Dec 16 19:22:19 2025 +0800

    [flink] Get table info of primary lake table. (#2152)
---
 .../apache/fluss/flink/catalog/Flink21Catalog.java | 22 +++++-
 .../fluss/flink/catalog/FlinkCatalog21Test.java    | 61 +++++++++++++++
 .../apache/fluss/flink/catalog/FlinkCatalog.java   | 19 ++++-
 .../fluss/flink/catalog/FlinkCatalogTest.java      | 91 ++++++++++++++++------
 4 files changed, 166 insertions(+), 27 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
index 4f567aae0..7a6f737e0 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
+++ 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
@@ -18,8 +18,10 @@
 
 package org.apache.fluss.flink.catalog;
 
+import org.apache.fluss.flink.lake.LakeFlinkCatalog;
 import org.apache.fluss.metadata.TableInfo;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -44,11 +46,29 @@ public class Flink21Catalog extends FlinkCatalog {
         super(name, defaultDatabase, bootstrapServers, classLoader, 
securityConfigs);
     }
 
+    @VisibleForTesting
+    public Flink21Catalog(
+            String name,
+            String defaultDatabase,
+            String bootstrapServers,
+            ClassLoader classLoader,
+            Map<String, String> securityConfigs,
+            LakeFlinkCatalog lakeFlinkCatalog) {
+        super(
+                name,
+                defaultDatabase,
+                bootstrapServers,
+                classLoader,
+                securityConfigs,
+                lakeFlinkCatalog);
+    }
+
     @Override
     public CatalogBaseTable getTable(ObjectPath objectPath)
             throws TableNotExistException, CatalogException {
         CatalogBaseTable catalogBaseTable = super.getTable(objectPath);
-        if (!(catalogBaseTable instanceof CatalogTable)) {
+        if (!(catalogBaseTable instanceof CatalogTable)
+                || objectPath.getObjectName().contains(LAKE_TABLE_SPLITTER)) {
             return catalogBaseTable;
         }
 
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
new file mode 100644
index 000000000..e66a625e5
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.catalog;
+
+import org.apache.fluss.flink.lake.LakeFlinkCatalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.DefaultIndex;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/** Test for {@link Flink21Catalog}. */
+public class FlinkCatalog21Test extends FlinkCatalogTest {
+
+    @Override
+    protected FlinkCatalog initCatalog(
+            String catalogName,
+            String databaseName,
+            String bootstrapServers,
+            LakeFlinkCatalog lakeFlinkCatalog) {
+        return new Flink21Catalog(
+                catalogName,
+                databaseName,
+                bootstrapServers,
+                Thread.currentThread().getContextClassLoader(),
+                Collections.emptyMap(),
+                lakeFlinkCatalog);
+    }
+
+    protected ResolvedSchema createSchema() {
+        return new ResolvedSchema(
+                Arrays.asList(
+                        Column.physical("first", DataTypes.STRING().notNull()),
+                        Column.physical("second", DataTypes.INT()),
+                        Column.physical("third", 
DataTypes.STRING().notNull())),
+                Collections.emptyList(),
+                UniqueConstraint.primaryKey("PK_first_third", 
Arrays.asList("first", "third")),
+                Collections.singletonList(
+                        DefaultIndex.newIndex(
+                                "INDEX_first_third", Arrays.asList("first", 
"third"))));
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 16dc60c86..16b05089a 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -124,13 +124,30 @@ public class FlinkCatalog extends AbstractCatalog {
             String bootstrapServers,
             ClassLoader classLoader,
             Map<String, String> securityConfigs) {
+        this(
+                name,
+                defaultDatabase,
+                bootstrapServers,
+                classLoader,
+                securityConfigs,
+                new LakeFlinkCatalog(name, classLoader));
+    }
+
+    @VisibleForTesting
+    public FlinkCatalog(
+            String name,
+            String defaultDatabase,
+            String bootstrapServers,
+            ClassLoader classLoader,
+            Map<String, String> securityConfigs,
+            LakeFlinkCatalog lakeFlinkCatalog) {
         super(name, defaultDatabase);
         this.catalogName = name;
         this.defaultDatabase = defaultDatabase;
         this.bootstrapServers = bootstrapServers;
         this.classLoader = classLoader;
         this.securityConfigs = securityConfigs;
-        this.lakeFlinkCatalog = new LakeFlinkCatalog(catalogName, classLoader);
+        this.lakeFlinkCatalog = lakeFlinkCatalog;
     }
 
     @Override
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index abaf8e7e0..c32492c4b 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -22,6 +22,7 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.IllegalConfigurationException;
 import org.apache.fluss.exception.InvalidPartitionException;
 import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.flink.lake.LakeFlinkCatalog;
 import org.apache.fluss.flink.utils.FlinkConversionsTest;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.utils.ExceptionUtils;
@@ -36,6 +37,7 @@ import 
org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -58,8 +60,7 @@ import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -78,6 +79,7 @@ import static 
org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
 import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.fluss.flink.adapter.CatalogTableAdapter.toCatalogTable;
 import static org.apache.fluss.flink.utils.CatalogTableTestUtils.addOptions;
 import static 
org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsIgnoreSchema;
 import static 
org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsRespectSchema;
@@ -101,7 +103,8 @@ class FlinkCatalogTest {
     private static final FlinkConversionsTest.TestRefreshHandler 
REFRESH_HANDLER =
             new FlinkConversionsTest.TestRefreshHandler("jobID: xxx, 
clusterId: yyy");
 
-    static Catalog catalog;
+    private Catalog catalog;
+    private MockLakeFlinkCatalog mockLakeCatalog;
     private final ObjectPath tableInDefaultDb = new ObjectPath(DEFAULT_DB, 
"t1");
 
     private static Configuration initConfig() {
@@ -110,7 +113,7 @@ class FlinkCatalogTest {
         return configuration;
     }
 
-    private ResolvedSchema createSchema() {
+    protected ResolvedSchema createSchema() {
         return new ResolvedSchema(
                 Arrays.asList(
                         Column.physical("first", DataTypes.STRING().notNull()),
@@ -128,7 +131,7 @@ class FlinkCatalogTest {
     private CatalogTable newCatalogTable(
             ResolvedSchema resolvedSchema, Map<String, String> options) {
         CatalogTable origin =
-                CatalogTable.of(
+                toCatalogTable(
                         
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
                         "test comment",
                         Collections.emptyList(),
@@ -158,29 +161,36 @@ class FlinkCatalogTest {
         return new ResolvedCatalogMaterializedTable(origin, resolvedSchema);
     }
 
-    @BeforeAll
-    static void beforeAll() {
+    protected FlinkCatalog initCatalog(
+            String catalogName,
+            String databaseName,
+            String bootstrapServers,
+            LakeFlinkCatalog lakeFlinkCatalog) {
+        return new FlinkCatalog(
+                catalogName,
+                databaseName,
+                bootstrapServers,
+                Thread.currentThread().getContextClassLoader(),
+                Collections.emptyMap(),
+                lakeFlinkCatalog);
+    }
+
+    @BeforeEach
+    void beforeEach() throws Exception {
         // set fluss conf
         Configuration flussConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+
+        mockLakeCatalog =
+                new MockLakeFlinkCatalog(
+                        CATALOG_NAME, 
Thread.currentThread().getContextClassLoader());
         catalog =
-                new FlinkCatalog(
+                initCatalog(
                         CATALOG_NAME,
                         DEFAULT_DB,
                         String.join(",", flussConf.get(BOOTSTRAP_SERVERS)),
-                        Thread.currentThread().getContextClassLoader(),
-                        Collections.emptyMap());
+                        mockLakeCatalog);
         catalog.open();
-    }
 
-    @AfterAll
-    static void afterAll() {
-        if (catalog != null) {
-            catalog.close();
-        }
-    }
-
-    @BeforeEach
-    void beforeEach() throws Exception {
         // First check if database exists, and drop it if it does
         if (catalog.databaseExists(DEFAULT_DB)) {
             catalog.dropDatabase(DEFAULT_DB, true, true);
@@ -198,6 +208,13 @@ class FlinkCatalogTest {
         }
     }
 
+    @AfterEach
+    void afterEach() {
+        if (catalog != null) {
+            catalog.close();
+        }
+    }
+
     @Test
     void testCreateTable() throws Exception {
         Map<String, String> options = new HashMap<>();
@@ -270,7 +287,7 @@ class FlinkCatalogTest {
         ResolvedSchema resolvedSchema = this.createSchema();
         CatalogTable table2 =
                 new ResolvedCatalogTable(
-                        CatalogTable.of(
+                        toCatalogTable(
                                 
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
                                 "test comment",
                                 Collections.singletonList("first"),
@@ -306,6 +323,11 @@ class FlinkCatalogTest {
         CatalogTable table = this.newCatalogTable(options);
         catalog.createTable(lakeTablePath, table, false);
         assertThat(catalog.tableExists(lakeTablePath)).isTrue();
+        // get the lake table from lake catalog.
+        mockLakeCatalog.registerLakeTable(lakeTablePath, table);
+        assertThat((CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, 
"lake_table$lake")))
+                .isEqualTo(table);
+
         // drop fluss table
         catalog.dropTable(lakeTablePath, false);
         assertThat(catalog.tableExists(lakeTablePath)).isFalse();
@@ -363,7 +385,7 @@ class FlinkCatalogTest {
                         UniqueConstraint.primaryKey(
                                 "PK_first", 
Collections.singletonList("first")));
         CatalogTable origin =
-                CatalogTable.of(
+                toCatalogTable(
                         
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
                         "test comment",
                         Collections.emptyList(),
@@ -648,7 +670,7 @@ class FlinkCatalogTest {
         ResolvedSchema resolvedSchema = this.createSchema();
         CatalogTable table2 =
                 new ResolvedCatalogTable(
-                        CatalogTable.of(
+                        toCatalogTable(
                                 
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
                                 "test comment",
                                 Collections.singletonList("first"),
@@ -758,7 +780,7 @@ class FlinkCatalogTest {
         ObjectPath partitionedPath = new ObjectPath(DEFAULT_DB, 
"partitioned_table1");
         CatalogTable partitionedTable =
                 new ResolvedCatalogTable(
-                        CatalogTable.of(
+                        toCatalogTable(
                                 
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
                                 "test comment",
                                 Collections.singletonList("first"),
@@ -845,7 +867,7 @@ class FlinkCatalogTest {
         ResolvedSchema schema = createSchema();
         CatalogTable partTable =
                 new ResolvedCatalogTable(
-                        CatalogTable.of(
+                        toCatalogTable(
                                 
Schema.newBuilder().fromResolvedSchema(schema).build(),
                                 "partitioned table for stats",
                                 Collections.singletonList("first"),
@@ -974,4 +996,23 @@ class FlinkCatalogTest {
         checkEqualsRespectSchema((CatalogTable) tableCreated, table);
         catalog.dropTable(tablePath, false);
     }
+
+    private static class MockLakeFlinkCatalog extends LakeFlinkCatalog {
+        private final GenericInMemoryCatalog catalog;
+
+        public MockLakeFlinkCatalog(String catalogName, ClassLoader 
classLoader) {
+            super(catalogName, classLoader);
+            catalog = new GenericInMemoryCatalog(catalogName, DEFAULT_DB);
+        }
+
+        @Override
+        public Catalog getLakeCatalog(Configuration tableOptions) {
+            return catalog;
+        }
+
+        void registerLakeTable(ObjectPath tablePath, CatalogTable table)
+                throws TableAlreadyExistException, DatabaseNotExistException {
+            catalog.createTable(tablePath, table, false);
+        }
+    }
 }

Reply via email to