This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new ee33fa254 [flink] Support Delta Join on Flink 2.1 (#1726)
ee33fa254 is described below

commit ee33fa254d22ef2a851196975837a1cfaf0f7652
Author: Xuyang <[email protected]>
AuthorDate: Sat Sep 27 17:53:54 2025 +0800

    [flink] Support Delta Join on Flink 2.1 (#1726)
---
 .../apache/fluss/flink/catalog/Flink21Catalog.java | 106 +++++++++++++
 .../fluss/flink/catalog/Flink21CatalogFactory.java |  34 ++++
 .../org.apache.flink.table.factories.Factory       |   2 +-
 .../fluss/flink/catalog/Flink21CatalogITCase.java  | 174 ++++++++++++++++++++-
 .../flink/source/Flink21TableSourceITCase.java     | 128 ++++++++++++++-
 .../apache/fluss/flink/catalog/FlinkCatalog.java   |   2 +-
 .../fluss/flink/catalog/FlinkCatalogITCase.java    |  17 +-
 7 files changed, 457 insertions(+), 6 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
new file mode 100644
index 000000000..4f567aae0
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.catalog;
+
+import org.apache.fluss.metadata.TableInfo;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** A {@link FlinkCatalog} used for Flink 2.1. */
+public class Flink21Catalog extends FlinkCatalog {
+
+    public Flink21Catalog(
+            String name,
+            String defaultDatabase,
+            String bootstrapServers,
+            ClassLoader classLoader,
+            Map<String, String> securityConfigs) {
+        super(name, defaultDatabase, bootstrapServers, classLoader, 
securityConfigs);
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath objectPath)
+            throws TableNotExistException, CatalogException {
+        CatalogBaseTable catalogBaseTable = super.getTable(objectPath);
+        if (!(catalogBaseTable instanceof CatalogTable)) {
+            return catalogBaseTable;
+        }
+
+        CatalogTable table = (CatalogTable) catalogBaseTable;
+        Optional<Schema.UnresolvedPrimaryKey> pkOp = 
table.getUnresolvedSchema().getPrimaryKey();
+        // If there is no pk, return directly.
+        if (pkOp.isEmpty()) {
+            return table;
+        }
+
+        Schema.Builder newSchemaBuilder =
+                Schema.newBuilder().fromSchema(table.getUnresolvedSchema());
+        // Pk is always an index.
+        newSchemaBuilder.index(pkOp.get().getColumnNames());
+
+        // Judge whether we can do prefix lookup.
+        TableInfo tableInfo = 
connection.getTable(toTablePath(objectPath)).getTableInfo();
+        List<String> bucketKeys = tableInfo.getBucketKeys();
+        // For partition table, the physical primary key is the primary key 
that excludes the
+        // partition key
+        List<String> physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys();
+        List<String> indexKeys = new ArrayList<>();
+        if (isPrefixList(physicalPrimaryKeys, bucketKeys)) {
+            indexKeys.addAll(bucketKeys);
+            if (tableInfo.isPartitioned()) {
+                indexKeys.addAll(tableInfo.getPartitionKeys());
+            }
+        }
+
+        if (!indexKeys.isEmpty()) {
+            newSchemaBuilder.index(indexKeys);
+        }
+        return CatalogTable.newBuilder()
+                .schema(newSchemaBuilder.build())
+                .comment(table.getComment())
+                .partitionKeys(table.getPartitionKeys())
+                .options(table.getOptions())
+                .snapshot(table.getSnapshot().orElse(null))
+                .distribution(table.getDistribution().orElse(null))
+                .build();
+    }
+
+    private static boolean isPrefixList(List<String> fullList, List<String> 
prefixList) {
+        if (fullList.size() <= prefixList.size()) {
+            return false;
+        }
+
+        for (int i = 0; i < prefixList.size(); i++) {
+            if (!fullList.get(i).equals(prefixList.get(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
new file mode 100644
index 000000000..8557a552f
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.catalog;
+
+/** A {@link FlinkCatalogFactory} used for Flink 2.1. */
+public class Flink21CatalogFactory extends FlinkCatalogFactory {
+
+    @Override
+    public FlinkCatalog createCatalog(Context context) {
+        FlinkCatalog catalog = super.createCatalog(context);
+        return new Flink21Catalog(
+                catalog.catalogName,
+                catalog.defaultDatabase,
+                catalog.bootstrapServers,
+                catalog.classLoader,
+                catalog.securityConfigs);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 6544cb534..f13f71331 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -16,4 +16,4 @@
 # limitations under the License.
 #
 
-org.apache.fluss.flink.catalog.FlinkCatalogFactory
\ No newline at end of file
+org.apache.fluss.flink.catalog.Flink21CatalogFactory
\ No newline at end of file
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
index b12aaa5e8..c0b9b9196 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
@@ -17,5 +17,177 @@
 
 package org.apache.fluss.flink.catalog;
 
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** IT case for catalog in Flink 2.1. */
-public class Flink21CatalogITCase extends FlinkCatalogITCase {}
+public class Flink21CatalogITCase extends FlinkCatalogITCase {
+
+    @BeforeAll
+    static void beforeAll() {
+        FlinkCatalogITCase.beforeAll();
+
+        // close the old one and open a new one later
+        catalog.close();
+
+        catalog =
+                new Flink21Catalog(
+                        catalog.catalogName,
+                        catalog.defaultDatabase,
+                        catalog.bootstrapServers,
+                        catalog.classLoader,
+                        catalog.securityConfigs);
+        catalog.open();
+    }
+
+    @Test
+    void testGetTableWithIndex() throws Exception {
+        String tableName = "table_with_pk_only";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a int, "
+                                + " b varchar, "
+                                + " c bigint, "
+                                + " primary key (a, b) NOT ENFORCED"
+                                + ") with ( "
+                                + " 'connector' = 'fluss' "
+                                + ")",
+                        tableName));
+        CatalogTable table = (CatalogTable) catalog.getTable(new 
ObjectPath(DEFAULT_DB, tableName));
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT().notNull())
+                        .column("b", DataTypes.STRING().notNull())
+                        .column("c", DataTypes.BIGINT())
+                        .primaryKey("a", "b")
+                        .index("a", "b")
+                        .build();
+        assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+
+        tableName = "table_with_prefix_bucket_key";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a int, "
+                                + " b varchar, "
+                                + " c bigint, "
+                                + " primary key (a, b) NOT ENFORCED"
+                                + ") with ( "
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'a'"
+                                + ")",
+                        tableName));
+
+        table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, 
tableName));
+        expectedSchema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT().notNull())
+                        .column("b", DataTypes.STRING().notNull())
+                        .column("c", DataTypes.BIGINT())
+                        .primaryKey("a", "b")
+                        .index("a", "b")
+                        .index("a")
+                        .build();
+        assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+
+        tableName = "table_with_bucket_key_is_not_prefix_pk";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a int, "
+                                + " b varchar, "
+                                + " c bigint, "
+                                + " primary key (a, b) NOT ENFORCED"
+                                + ") with ( "
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'b'"
+                                + ")",
+                        tableName));
+
+        table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, 
tableName));
+        expectedSchema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT().notNull())
+                        .column("b", DataTypes.STRING().notNull())
+                        .column("c", DataTypes.BIGINT())
+                        .primaryKey("a", "b")
+                        .index("a", "b")
+                        .build();
+        assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+
+        tableName = "table_with_partition_1";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a int, "
+                                + " b varchar, "
+                                + " c bigint, "
+                                + " dt varchar, "
+                                + " primary key (a, b, dt) NOT ENFORCED "
+                                + ") "
+                                + " partitioned by (dt) "
+                                + " with ( "
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'a'"
+                                + ")",
+                        tableName));
+
+        table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, 
tableName));
+        expectedSchema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT().notNull())
+                        .column("b", DataTypes.STRING().notNull())
+                        .column("c", DataTypes.BIGINT())
+                        .column("dt", DataTypes.STRING().notNull())
+                        .primaryKey("a", "b", "dt")
+                        .index("a", "b", "dt")
+                        .index("a", "dt")
+                        .build();
+        assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+
+        tableName = "table_with_partition_2";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a int, "
+                                + " b varchar, "
+                                + " c bigint, "
+                                + " dt varchar, "
+                                + " primary key (dt, a, b) NOT ENFORCED "
+                                + ") "
+                                + " partitioned by (dt) "
+                                + " with ( "
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'a'"
+                                + ")",
+                        tableName));
+
+        table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, 
tableName));
+        expectedSchema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT().notNull())
+                        .column("b", DataTypes.STRING().notNull())
+                        .column("c", DataTypes.BIGINT())
+                        .column("dt", DataTypes.STRING().notNull())
+                        .primaryKey("dt", "a", "b")
+                        .index("dt", "a", "b")
+                        .index("a", "dt")
+                        .build();
+        assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+    }
+
+    @Override
+    protected void addDefaultIndexKey(Schema.Builder schemaBuilder) {
+        super.addDefaultIndexKey(schemaBuilder);
+
+        Schema currentSchema = schemaBuilder.build();
+        currentSchema.getPrimaryKey().ifPresent(pk -> 
schemaBuilder.index(pk.getColumnNames()));
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
index b337cab46..9b1e908da 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
@@ -17,5 +17,131 @@
 
 package org.apache.fluss.flink.source;
 
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** IT case for {@link FlinkTableSource} in Flink 2.1. */
-public class Flink21TableSourceITCase extends FlinkTableSourceITCase {}
+public class Flink21TableSourceITCase extends FlinkTableSourceITCase {
+
+    @Test
+    void testDeltaJoin() throws Exception {
+        // start two jobs for this test: one for DML involving the delta join, 
and the other for DQL
+        // to query the results of the sink table
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+
+        String leftTableName = "left_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " b1 varchar, "
+                                + " c1 bigint, "
+                                + " d1 int, "
+                                + " e1 bigint, "
+                                + " primary key (c1, d1) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c1', "
+                                // currently, delta join only support 
append-only source
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        leftTableName));
+        List<InternalRow> rows1 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v2", 200L, 2, 20000L),
+                        row(3, "v1", 300L, 3, 30000L),
+                        row(4, "v4", 400L, 4, 40000L));
+        // write records
+        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+        writeRows(conn, leftTablePath, rows1, false);
+
+        String rightTableName = "right_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " a2 int, "
+                                + " b2 varchar, "
+                                + " c2 bigint, "
+                                + " d2 int, "
+                                + " e2 bigint, "
+                                + " primary key (c2, d2) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c2', "
+                                // currently, delta join only support 
append-only source
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        rightTableName));
+        List<InternalRow> rows2 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v3", 200L, 2, 20000L),
+                        row(3, "v4", 300L, 4, 30000L),
+                        row(4, "v4", 500L, 4, 50000L));
+        // write records
+        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+        writeRows(conn, rightTablePath, rows2, false);
+
+        String sinkTableName = "sink_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " b1 varchar, "
+                                + " c1 bigint, "
+                                + " d1 int, "
+                                + " e1 bigint, "
+                                + " a2 int, "
+                                + " b2 varchar, "
+                                + " c2 bigint, "
+                                + " d2 int, "
+                                + " e2 bigint, "
+                                + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss' "
+                                + ")",
+                        sinkTableName));
+
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+                        OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+        String sql =
+                String.format(
+                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2 AND d1 = d2",
+                        sinkTableName, leftTableName, rightTableName);
+
+        assertThat(tEnv.explainSql(sql))
+                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
+
+        tEnv.executeSql(sql);
+
+        CloseableIterator<Row> collected =
+                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+                        "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+                        "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+                        "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+                        "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+                        "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]");
+        assertResultsIgnoreOrder(collected, expected, true);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 13bd4b019..97933b035 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -114,7 +114,7 @@ public class FlinkCatalog extends AbstractCatalog {
     protected final String catalogName;
     protected final String defaultDatabase;
     protected final String bootstrapServers;
-    private final Map<String, String> securityConfigs;
+    protected final Map<String, String> securityConfigs;
     protected Connection connection;
     protected Admin admin;
     private volatile @Nullable LakeCatalog lakeCatalog;
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 300e639e4..062c6eea7 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -86,7 +86,7 @@ abstract class FlinkCatalogITCase {
 
     static final String CATALOG_NAME = "testcatalog";
     static final String DEFAULT_DB = 
FlinkCatalogOptions.DEFAULT_DATABASE.defaultValue();
-    static Catalog catalog;
+    static FlinkCatalog catalog;
 
     protected TableEnvironment tEnv;
 
@@ -181,6 +181,7 @@ abstract class FlinkCatalogITCase {
                 .column("r", DataTypes.TIMESTAMP_LTZ())
                 .column("s", DataTypes.ROW(DataTypes.FIELD("a", 
DataTypes.INT())))
                 .primaryKey("a");
+        addDefaultIndexKey(schemaBuilder);
         Schema expectedSchema = schemaBuilder.build();
         CatalogTable table =
                 (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, 
"test_table"));
@@ -291,6 +292,7 @@ abstract class FlinkCatalogITCase {
         tEnv.executeSql("create table append_only_table(a int, b int) with 
('bucket.num' = '10')");
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("a", DataTypes.INT()).column("b", 
DataTypes.INT());
+        addDefaultIndexKey(schemaBuilder);
         Schema expectedSchema = schemaBuilder.build();
         CatalogTable table =
                 (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, 
"append_only_table"));
@@ -313,6 +315,7 @@ abstract class FlinkCatalogITCase {
                 .column("a", DataTypes.INT())
                 .column("b", DataTypes.STRING())
                 .column("dt", DataTypes.STRING());
+        addDefaultIndexKey(schemaBuilder);
         Schema expectedSchema = schemaBuilder.build();
         CatalogTable table = (CatalogTable) catalog.getTable(objectPath);
         assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
@@ -366,6 +369,7 @@ abstract class FlinkCatalogITCase {
                         + " 'table.auto-partition.time-unit' = 'year')");
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("a", DataTypes.INT()).column("b", 
DataTypes.STRING());
+        addDefaultIndexKey(schemaBuilder);
         Schema expectedSchema = schemaBuilder.build();
         CatalogTable table = (CatalogTable) catalog.getTable(objectPath);
         assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
@@ -475,6 +479,7 @@ abstract class FlinkCatalogITCase {
                 .column("b", DataTypes.STRING())
                 .column("c", DataTypes.STRING())
                 .column("hh", DataTypes.STRING());
+        addDefaultIndexKey(schemaBuilder);
         Schema expectedSchema = schemaBuilder.build();
         CatalogTable table = (CatalogTable) catalog.getTable(objectPath);
         assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
@@ -559,6 +564,7 @@ abstract class FlinkCatalogITCase {
                 .column("order_time", DataTypes.TIMESTAMP(3))
                 .watermark("order_time", "`order_time` - INTERVAL '5' SECOND")
                 .primaryKey("user");
+        addDefaultIndexKey(schemaBuilder);
         Schema expectedSchema = schemaBuilder.build();
         assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
         Map<String, String> expectedOptions = new HashMap<>();
@@ -775,7 +781,14 @@ abstract class FlinkCatalogITCase {
                         "The configured default-database 'non-exist' does not 
exist in the Fluss cluster.");
     }
 
-    private static void assertOptionsEqual(
+    /**
+     * Before Flink 2.1, the {@link Schema} did not include an index field. 
Starting from Flink 2.1,
+     * Flink introduced the concept of an index, and in Fluss, the primary key 
is considered as an
+     * index.
+     */
+    protected void addDefaultIndexKey(Schema.Builder schemaBuilder) {}
+
+    protected static void assertOptionsEqual(
             Map<String, String> actualOptions, Map<String, String> 
expectedOptions) {
         actualOptions.remove(ConfigOptions.BOOTSTRAP_SERVERS.key());
         actualOptions.remove(ConfigOptions.TABLE_REPLICATION_FACTOR.key());

Reply via email to