This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 684a43de2 [flink] Support Flink 2.2 and remove Flink 2.1 (#2167)
684a43de2 is described below

commit 684a43de2388b28e0cc7a427471e3e17423b1c66
Author: vamossagar12 <[email protected]>
AuthorDate: Wed Dec 24 16:48:38 2025 +0530

    [flink] Support Flink 2.2 and remove Flink 2.1 (#2167)
    
    ---------
    
    Co-authored-by: Hongshun Wang <[email protected]>
    Co-authored-by: Jark Wu <[email protected]>
---
 .github/workflows/stage.sh                         |   2 +-
 .../apache/fluss/flink/adapter/SinkAdapter.java}   |  29 +-
 .../org/apache/flink/api/connector/sink2/Sink.java |  61 ----
 .../apache/fluss/flink/catalog/Flink21Catalog.java | 136 ---------
 .../fluss/flink/catalog/Flink21CatalogFactory.java |  35 ---
 .../flink/source/Flink21TableSourceITCase.java     | 147 ----------
 .../{fluss-flink-2.1 => fluss-flink-2.2}/pom.xml   |  22 +-
 .../adapter/MultipleParameterToolAdapter.java      |   0
 .../apache/fluss/flink/adapter/SchemaAdapter.java} |  28 +-
 .../apache/fluss/flink/adapter/SinkAdapter.java}   |  31 +--
 .../org.apache.flink.table.factories.Factory       |   2 +-
 .../adapter/Flink22MultipleParameterToolTest.java} |   4 +-
 .../ResolvedCatalogMaterializedTableAdapter.java   |  41 +++
 .../fluss/flink/catalog/Flink22CatalogITCase.java} |  23 +-
 .../fluss/flink/catalog/Flink22CatalogTest.java}   |  22 +-
 .../catalog/Flink22MaterializedTableITCase.java}   |   4 +-
 .../fluss/flink/metrics/Flink22MetricsITCase.java} |   4 +-
 .../flink/procedure/Flink22ProcedureITCase.java}   |   4 +-
 .../security/acl/Flink22AuthorizationITCase.java}  |   4 +-
 .../flink/sink/Flink22ComplexTypeITCase.java}      |   4 +-
 .../fluss/flink/sink/Flink22TableSinkITCase.java}  |   4 +-
 .../source/Flink22TableSourceBatchITCase.java}     |   4 +-
 .../source/Flink22TableSourceFailOverITCase.java}  |   4 +-
 .../flink/source/Flink22TableSourceITCase.java     | 309 +++++++++++++++++++++
 .../org.junit.jupiter.api.extension.Extension      |   0
 .../src/test/resources/log4j2-test.properties      |   0
 .../apache/fluss/flink/adapter/SchemaAdapter.java} |  26 +-
 ...ngleThreadMultiplexSourceReaderBaseAdapter.java |   2 +
 .../apache/fluss/flink/adapter/SinkAdapter.java    |  48 ++++
 .../apache/fluss/flink/catalog/FlinkCatalog.java   |  60 +++-
 .../org/apache/fluss/flink/sink/FlinkSink.java     |  24 +-
 .../ResolvedCatalogMaterializedTableAdapter.java   |  41 +++
 .../fluss/flink/catalog/FlinkCatalogTest.java      |   8 +-
 .../flink/source/FlinkTableSourceBatchITCase.java  |   8 +-
 fluss-flink/pom.xml                                |   2 +-
 fluss-test-coverage/pom.xml                        |   3 +-
 36 files changed, 635 insertions(+), 511 deletions(-)

diff --git a/.github/workflows/stage.sh b/.github/workflows/stage.sh
index 49a3ed57e..18587dc29 100755
--- a/.github/workflows/stage.sh
+++ b/.github/workflows/stage.sh
@@ -24,7 +24,7 @@ STAGE_LAKE="lake"
 MODULES_FLINK="\
 fluss-flink,\
 fluss-flink/fluss-flink-common,\
-fluss-flink/fluss-flink-2.1,\
+fluss-flink/fluss-flink-2.2,\
 fluss-flink/fluss-flink-1.20,\
 "
 
diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
 
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
similarity index 54%
copy from 
fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
copy to 
fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
index 076dcb86c..575bfdfbd 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
+++ 
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
@@ -17,30 +17,25 @@
 
 package org.apache.fluss.flink.adapter;
 
-import org.apache.flink.util.MultipleParameterTool;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 
-import java.util.Map;
+import java.io.IOException;
 
 /**
- * An adapter for Flink {@link MultipleParameterTool} class. The {@link 
MultipleParameterTool} is
- * moved to a new package since Flink 2.x, so this adapter helps to bridge 
compatibility for
- * different Flink versions.
+ * Flink sink adapter which hide the different version of createWriter method.
  *
  * <p>TODO: remove this class when no longer support all the Flink 1.x series.
  */
-public class MultipleParameterToolAdapter {
+public abstract class SinkAdapter<InputT> implements Sink<InputT> {
 
-    private MultipleParameterToolAdapter() {}
-
-    private MultipleParameterTool multipleParameterTool;
-
-    public static MultipleParameterToolAdapter fromArgs(String[] args) {
-        MultipleParameterToolAdapter adapter = new 
MultipleParameterToolAdapter();
-        adapter.multipleParameterTool = MultipleParameterTool.fromArgs(args);
-        return adapter;
+    @Override
+    public SinkWriter<InputT> createWriter(InitContext initContext) throws 
IOException {
+        return createWriter(initContext.getMailboxExecutor(), 
initContext.metricGroup());
     }
 
-    public Map<String, String> toMap() {
-        return this.multipleParameterTool.toMap();
-    }
+    protected abstract SinkWriter<InputT> createWriter(
+            MailboxExecutor mailboxExecutor, SinkWriterMetricGroup 
metricGroup);
 }
diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
deleted file mode 100644
index 8cab1cb53..000000000
--- 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * Placeholder class to resolve compatibility issues. This placeholder class 
can be removed once we
- * drop the support of Flink v1.18 which requires the legacy InitContext 
interface.
- */
-public interface Sink<InputT> extends Serializable {
-
-    /**
-     * Creates a {@link SinkWriter}.
-     *
-     * @param context the runtime context.
-     * @return A sink writer.
-     * @throws IOException for any failure during creation.
-     */
-    SinkWriter<InputT> createWriter(WriterInitContext context) throws 
IOException;
-
-    /** The interface exposes some runtime info for creating a {@link 
SinkWriter}. */
-    interface InitContext {
-
-        /**
-         * Returns the mailbox executor that allows to execute {@link 
Runnable}s inside the task
-         * thread in between record processing.
-         *
-         * <p>Note that this method should not be used per-record for 
performance reasons in the
-         * same way as records should not be sent to the external system 
individually. Rather,
-         * implementers are expected to batch records and only enqueue a 
single {@link Runnable} per
-         * batch to handle the result.
-         */
-        MailboxExecutor getMailboxExecutor();
-
-        /**
-         * @return The metric group this writer belongs to.
-         */
-        SinkWriterMetricGroup metricGroup();
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
deleted file mode 100644
index d14e3af6f..000000000
--- 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.catalog;
-
-import org.apache.fluss.flink.lake.LakeFlinkCatalog;
-import org.apache.fluss.metadata.TableInfo;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Supplier;
-
-/** A {@link FlinkCatalog} used for Flink 2.1. */
-public class Flink21Catalog extends FlinkCatalog {
-
-    public Flink21Catalog(
-            String name,
-            String defaultDatabase,
-            String bootstrapServers,
-            ClassLoader classLoader,
-            Map<String, String> securityConfigs,
-            Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
-        super(
-                name,
-                defaultDatabase,
-                bootstrapServers,
-                classLoader,
-                securityConfigs,
-                lakeCatalogPropertiesSupplier);
-    }
-
-    @VisibleForTesting
-    public Flink21Catalog(
-            String name,
-            String defaultDatabase,
-            String bootstrapServers,
-            ClassLoader classLoader,
-            Map<String, String> securityConfigs,
-            Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
-            LakeFlinkCatalog lakeFlinkCatalog) {
-        super(
-                name,
-                defaultDatabase,
-                bootstrapServers,
-                classLoader,
-                securityConfigs,
-                lakeCatalogPropertiesSupplier,
-                lakeFlinkCatalog);
-    }
-
-    @Override
-    public CatalogBaseTable getTable(ObjectPath objectPath)
-            throws TableNotExistException, CatalogException {
-        CatalogBaseTable catalogBaseTable = super.getTable(objectPath);
-        if (!(catalogBaseTable instanceof CatalogTable)
-                || objectPath.getObjectName().contains(LAKE_TABLE_SPLITTER)) {
-            return catalogBaseTable;
-        }
-
-        CatalogTable table = (CatalogTable) catalogBaseTable;
-        Optional<Schema.UnresolvedPrimaryKey> pkOp = 
table.getUnresolvedSchema().getPrimaryKey();
-        // If there is no pk, return directly.
-        if (pkOp.isEmpty()) {
-            return table;
-        }
-
-        Schema.Builder newSchemaBuilder =
-                Schema.newBuilder().fromSchema(table.getUnresolvedSchema());
-        // Pk is always an index.
-        newSchemaBuilder.index(pkOp.get().getColumnNames());
-
-        // Judge whether we can do prefix lookup.
-        TableInfo tableInfo = 
connection.getTable(toTablePath(objectPath)).getTableInfo();
-        List<String> bucketKeys = tableInfo.getBucketKeys();
-        // For partition table, the physical primary key is the primary key 
that excludes the
-        // partition key
-        List<String> physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys();
-        List<String> indexKeys = new ArrayList<>();
-        if (isPrefixList(physicalPrimaryKeys, bucketKeys)) {
-            indexKeys.addAll(bucketKeys);
-            if (tableInfo.isPartitioned()) {
-                indexKeys.addAll(tableInfo.getPartitionKeys());
-            }
-        }
-
-        if (!indexKeys.isEmpty()) {
-            newSchemaBuilder.index(indexKeys);
-        }
-        return CatalogTable.newBuilder()
-                .schema(newSchemaBuilder.build())
-                .comment(table.getComment())
-                .partitionKeys(table.getPartitionKeys())
-                .options(table.getOptions())
-                .snapshot(table.getSnapshot().orElse(null))
-                .distribution(table.getDistribution().orElse(null))
-                .build();
-    }
-
-    private static boolean isPrefixList(List<String> fullList, List<String> 
prefixList) {
-        if (fullList.size() <= prefixList.size()) {
-            return false;
-        }
-
-        for (int i = 0; i < prefixList.size(); i++) {
-            if (!fullList.get(i).equals(prefixList.get(i))) {
-                return false;
-            }
-        }
-        return true;
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
deleted file mode 100644
index cff44ab86..000000000
--- 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.catalog;
-
-/** A {@link FlinkCatalogFactory} used for Flink 2.1. */
-public class Flink21CatalogFactory extends FlinkCatalogFactory {
-
-    @Override
-    public FlinkCatalog createCatalog(Context context) {
-        FlinkCatalog catalog = super.createCatalog(context);
-        return new Flink21Catalog(
-                catalog.catalogName,
-                catalog.defaultDatabase,
-                catalog.bootstrapServers,
-                catalog.classLoader,
-                catalog.securityConfigs,
-                catalog.lakeCatalogPropertiesSupplier);
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
deleted file mode 100644
index 9b1e908da..000000000
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.source;
-
-import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
-import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
-import static org.apache.fluss.testutils.DataTestUtils.row;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** IT case for {@link FlinkTableSource} in Flink 2.1. */
-public class Flink21TableSourceITCase extends FlinkTableSourceITCase {
-
-    @Test
-    void testDeltaJoin() throws Exception {
-        // start two jobs for this test: one for DML involving the delta join, 
and the other for DQL
-        // to query the results of the sink table
-        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
-
-        String leftTableName = "left_table";
-        tEnv.executeSql(
-                String.format(
-                        "create table %s ( "
-                                + " a1 int, "
-                                + " b1 varchar, "
-                                + " c1 bigint, "
-                                + " d1 int, "
-                                + " e1 bigint, "
-                                + " primary key (c1, d1) NOT ENFORCED"
-                                + ") with ("
-                                + " 'connector' = 'fluss', "
-                                + " 'bucket.key' = 'c1', "
-                                // currently, delta join only support 
append-only source
-                                + " 'table.merge-engine' = 'first_row' "
-                                + ")",
-                        leftTableName));
-        List<InternalRow> rows1 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v2", 200L, 2, 20000L),
-                        row(3, "v1", 300L, 3, 30000L),
-                        row(4, "v4", 400L, 4, 40000L));
-        // write records
-        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
-        writeRows(conn, leftTablePath, rows1, false);
-
-        String rightTableName = "right_table";
-        tEnv.executeSql(
-                String.format(
-                        "create table %s ("
-                                + " a2 int, "
-                                + " b2 varchar, "
-                                + " c2 bigint, "
-                                + " d2 int, "
-                                + " e2 bigint, "
-                                + " primary key (c2, d2) NOT ENFORCED"
-                                + ") with ("
-                                + " 'connector' = 'fluss', "
-                                + " 'bucket.key' = 'c2', "
-                                // currently, delta join only support 
append-only source
-                                + " 'table.merge-engine' = 'first_row' "
-                                + ")",
-                        rightTableName));
-        List<InternalRow> rows2 =
-                Arrays.asList(
-                        row(1, "v1", 100L, 1, 10000L),
-                        row(2, "v3", 200L, 2, 20000L),
-                        row(3, "v4", 300L, 4, 30000L),
-                        row(4, "v4", 500L, 4, 50000L));
-        // write records
-        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
-        writeRows(conn, rightTablePath, rows2, false);
-
-        String sinkTableName = "sink_table";
-        tEnv.executeSql(
-                String.format(
-                        "create table %s ( "
-                                + " a1 int, "
-                                + " b1 varchar, "
-                                + " c1 bigint, "
-                                + " d1 int, "
-                                + " e1 bigint, "
-                                + " a2 int, "
-                                + " b2 varchar, "
-                                + " c2 bigint, "
-                                + " d2 int, "
-                                + " e2 bigint, "
-                                + " primary key (c1, d1, c2, d2) NOT ENFORCED"
-                                + ") with ("
-                                + " 'connector' = 'fluss' "
-                                + ")",
-                        sinkTableName));
-
-        tEnv.getConfig()
-                .set(
-                        
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
-                        OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
-
-        String sql =
-                String.format(
-                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2 AND d1 = d2",
-                        sinkTableName, leftTableName, rightTableName);
-
-        assertThat(tEnv.explainSql(sql))
-                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
-
-        tEnv.executeSql(sql);
-
-        CloseableIterator<Row> collected =
-                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
-        List<String> expected =
-                Arrays.asList(
-                        "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
-                        "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
-                        "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
-                        "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
-                        "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
-                        "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]");
-        assertResultsIgnoreOrder(collected, expected, true);
-    }
-}
diff --git a/fluss-flink/fluss-flink-2.1/pom.xml 
b/fluss-flink/fluss-flink-2.2/pom.xml
similarity index 93%
rename from fluss-flink/fluss-flink-2.1/pom.xml
rename to fluss-flink/fluss-flink-2.2/pom.xml
index 5b5475f3d..c6527e282 100644
--- a/fluss-flink/fluss-flink-2.1/pom.xml
+++ b/fluss-flink/fluss-flink-2.2/pom.xml
@@ -26,11 +26,11 @@
         <version>0.9-SNAPSHOT</version>
     </parent>
 
-    <artifactId>fluss-flink-2.1</artifactId>
-    <name>Fluss : Engine Flink : 2.1 </name>
+    <artifactId>fluss-flink-2.2</artifactId>
+    <name>Fluss : Engine Flink : 2.2 </name>
     <properties>
-        <flink.major.version>2.1</flink.major.version>
-        <flink.minor.version>2.1.1</flink.minor.version>
+        <flink.major.version>2.2</flink.major.version>
+        <flink.minor.version>2.2.0</flink.minor.version>
     </properties>
 
     <dependencies>
@@ -68,6 +68,20 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.minor.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.minor.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- test dependency -->
         <dependency>
             <groupId>org.apache.fluss</groupId>
diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
 
b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
similarity index 100%
copy from 
fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
copy to 
fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
diff --git 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
 
b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java
similarity index 53%
rename from 
fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
rename to 
fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java
index 6caa32d7c..8f6492bd1 100644
--- 
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java
@@ -15,7 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.flink.api.connector.sink2;
+package org.apache.fluss.flink.adapter;
 
-/** Placeholder class to resolve compatibility issues. */
-public interface WriterInitContext extends Sink.InitContext {}
+import org.apache.flink.table.api.Schema;
+
+import java.util.List;
+
+/**
+ * An adapter for the schema with Index.
+ *
+ * <p>TODO: remove this class when no longer support all the Flink 1.x series.
+ */
+public class SchemaAdapter {
+    private SchemaAdapter() {}
+
+    public static Schema withIndex(Schema unresolvedSchema, List<List<String>> 
indexes) {
+        Schema.Builder newSchemaBuilder = 
Schema.newBuilder().fromSchema(unresolvedSchema);
+        for (List<String> index : indexes) {
+            newSchemaBuilder.index(index);
+        }
+        return newSchemaBuilder.build();
+    }
+
+    public static boolean supportIndex() {
+        return true;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
 
b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
similarity index 52%
rename from 
fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
rename to 
fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
index 076dcb86c..8d6e51250 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
@@ -17,30 +17,27 @@
 
 package org.apache.fluss.flink.adapter;
 
-import org.apache.flink.util.MultipleParameterTool;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 
-import java.util.Map;
+import java.io.IOException;
 
 /**
- * An adapter for Flink {@link MultipleParameterTool} class. The {@link 
MultipleParameterTool} is
- * moved to a new package since Flink 2.x, so this adapter helps to bridge 
compatibility for
- * different Flink versions.
+ * Flink sink adapter which hide the different version of createWriter method.
  *
  * <p>TODO: remove this class when no longer support all the Flink 1.x series.
  */
-public class MultipleParameterToolAdapter {
+public abstract class SinkAdapter<InputT> implements Sink<InputT> {
 
-    private MultipleParameterToolAdapter() {}
-
-    private MultipleParameterTool multipleParameterTool;
-
-    public static MultipleParameterToolAdapter fromArgs(String[] args) {
-        MultipleParameterToolAdapter adapter = new 
MultipleParameterToolAdapter();
-        adapter.multipleParameterTool = MultipleParameterTool.fromArgs(args);
-        return adapter;
+    @Override
+    public SinkWriter<InputT> createWriter(WriterInitContext 
writerInitContext) throws IOException {
+        return createWriter(
+                writerInitContext.getMailboxExecutor(), 
writerInitContext.metricGroup());
     }
 
-    public Map<String, String> toMap() {
-        return this.multipleParameterTool.toMap();
-    }
+    protected abstract SinkWriter<InputT> createWriter(
+            MailboxExecutor mailboxExecutor, SinkWriterMetricGroup 
metricGroup);
 }
diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/fluss-flink/fluss-flink-2.2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 93%
rename from 
fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
rename to 
fluss-flink/fluss-flink-2.2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index f13f71331..d5aca2d53 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/fluss-flink/fluss-flink-2.2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -16,4 +16,4 @@
 # limitations under the License.
 #
 
-org.apache.fluss.flink.catalog.Flink21CatalogFactory
\ No newline at end of file
+org.apache.fluss.flink.catalog.FlinkCatalogFactory
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/Flink22MultipleParameterToolTest.java
similarity index 87%
copy from 
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java
copy to 
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/Flink22MultipleParameterToolTest.java
index f3922bcbd..4d06fe901 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/Flink22MultipleParameterToolTest.java
@@ -17,5 +17,5 @@
 
 package org.apache.fluss.flink.adapter;
 
-/** Test for {@link MultipleParameterToolAdapter} in flink 2.1. */
-public class Flink21MultipleParameterToolTest extends 
FlinkMultipleParameterToolTest {}
+/** Test for {@link MultipleParameterToolAdapter} in flink 2.2. */
+public class Flink22MultipleParameterToolTest extends 
FlinkMultipleParameterToolTest {}
diff --git 
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java
new file mode 100644
index 000000000..f868a84db
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+
+/**
+ * Adapter for {@link ResolvedCatalogMaterializedTable} because the 
constructor is compatibility in
+ * flink 2.2. However, this constructor only used in test.
+ *
+ * <p>TODO: remove it until <a 
href="https://issues.apache.org/jira/browse/FLINK-38532";>...</a> is
+ * fixed.
+ */
+public class ResolvedCatalogMaterializedTableAdapter {
+
+    public static ResolvedCatalogMaterializedTable create(
+            CatalogMaterializedTable origin,
+            ResolvedSchema resolvedSchema,
+            CatalogMaterializedTable.RefreshMode refreshMode,
+            IntervalFreshness freshness) {
+        return new ResolvedCatalogMaterializedTable(origin, resolvedSchema, 
refreshMode, freshness);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java
similarity index 91%
rename from 
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
rename to 
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java
index 62bf5b9aa..b87965727 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java
@@ -21,31 +21,12 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** IT case for catalog in Flink 2.1. */
-public class Flink21CatalogITCase extends FlinkCatalogITCase {
-
-    @BeforeAll
-    static void beforeAll() {
-        FlinkCatalogITCase.beforeAll();
-
-        // close the old one and open a new one later
-        catalog.close();
-
-        catalog =
-                new Flink21Catalog(
-                        catalog.catalogName,
-                        catalog.defaultDatabase,
-                        catalog.bootstrapServers,
-                        catalog.classLoader,
-                        catalog.securityConfigs,
-                        catalog.lakeCatalogPropertiesSupplier);
-        catalog.open();
-    }
+/** IT case for catalog in Flink 2.2. */
+public class Flink22CatalogITCase extends FlinkCatalogITCase {
 
     @Test
     void testGetTableWithIndex() throws Exception {
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java
similarity index 73%
rename from 
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
rename to 
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java
index d6aa6ef56..bff2b77d6 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.fluss.flink.catalog;
 
-import org.apache.fluss.flink.lake.LakeFlinkCatalog;
-
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.DefaultIndex;
@@ -28,24 +26,8 @@ import org.apache.flink.table.catalog.UniqueConstraint;
 import java.util.Arrays;
 import java.util.Collections;
 
-/** Test for {@link Flink21Catalog}. */
-public class FlinkCatalog21Test extends FlinkCatalogTest {
-
-    @Override
-    protected FlinkCatalog initCatalog(
-            String catalogName,
-            String databaseName,
-            String bootstrapServers,
-            LakeFlinkCatalog lakeFlinkCatalog) {
-        return new Flink21Catalog(
-                catalogName,
-                databaseName,
-                bootstrapServers,
-                Thread.currentThread().getContextClassLoader(),
-                Collections.emptyMap(),
-                Collections::emptyMap,
-                lakeFlinkCatalog);
-    }
+/** Test for {@link FlinkCatalog}. */
+public class Flink22CatalogTest extends FlinkCatalogTest {
 
     protected ResolvedSchema createSchema() {
         return new ResolvedSchema(
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21MaterializedTableITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22MaterializedTableITCase.java
similarity index 88%
rename from 
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21MaterializedTableITCase.java
rename to 
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22MaterializedTableITCase.java
index ee3041996..36240466c 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21MaterializedTableITCase.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22MaterializedTableITCase.java
@@ -17,5 +17,5 @@
 
 package org.apache.fluss.flink.catalog;
 
-/** IT case for materialized table in Flink 2.1. */
-public class Flink21MaterializedTableITCase extends MaterializedTableITCase {}
+/** IT case for materialized table in Flink 2.2. */
+public class Flink22MaterializedTableITCase extends MaterializedTableITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/metrics/Flink21MetricsITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/metrics/Flink22MetricsITCase.java
similarity index 88%
rename from 
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/metrics/Flink21MetricsITCase.java
rename to 
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/metrics/Flink22MetricsITCase.java
index 961c69d2f..37d795e0b 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/metrics/Flink21MetricsITCase.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/metrics/Flink22MetricsITCase.java
@@ -17,5 +17,5 @@
 
 package org.apache.fluss.flink.metrics;
 
-/** IT case for metrics in Flink 2.1. */
-public class Flink21MetricsITCase extends FlinkMetricsITCase {}
+/** IT case for metrics in Flink 2.2. */
+public class Flink22MetricsITCase extends FlinkMetricsITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/procedure/Flink21ProcedureITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/procedure/Flink22ProcedureITCase.java
similarity index 88%
rename from 
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/procedure/Flink21ProcedureITCase.java
rename to 
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/procedure/Flink22ProcedureITCase.java
index c95c47f0f..84612dc94 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/procedure/Flink21ProcedureITCase.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/procedure/Flink22ProcedureITCase.java
@@ -17,5 +17,5 @@
 
 package org.apache.fluss.flink.procedure;
 
-/** IT case for procedure in Flink 2.1. */
-public class Flink21ProcedureITCase extends FlinkProcedureITCase {}
+/** IT case for procedure in Flink 2.2. */
+public class Flink22ProcedureITCase extends FlinkProcedureITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/security/acl/Flink21AuthorizationITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/security/acl/Flink22AuthorizationITCase.java
similarity index 88%
rename from 
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/security/acl/Flink21AuthorizationITCase.java
rename to 
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/security/acl/Flink22AuthorizationITCase.java
index 66aefab8e..192be6b90 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/security/acl/Flink21AuthorizationITCase.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/security/acl/Flink22AuthorizationITCase.java
@@ -17,5 +17,5 @@
 
 package org.apache.fluss.flink.security.acl;
 
-/** IT case for authorization in Flink 2.1. */
-public class Flink21AuthorizationITCase extends FlinkAuthorizationITCase {}
+/** IT case for authorization in Flink 2.2. */
+public class Flink22AuthorizationITCase extends FlinkAuthorizationITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22ComplexTypeITCase.java
similarity index 87%
rename from 
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java
rename to 
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22ComplexTypeITCase.java
index 7b2ed44b2..cbb1f9a96 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22ComplexTypeITCase.java
@@ -17,5 +17,5 @@
 
 package org.apache.fluss.flink.sink;
 
-/** Integration tests for Array type support in Flink 2.1. */
-public class Flink21ComplexTypeITCase extends FlinkComplexTypeITCase {}
+/** Integration tests for Array type support in Flink 2.2. */
+public class Flink22ComplexTypeITCase extends FlinkComplexTypeITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21TableSinkITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22TableSinkITCase.java
similarity index 87%
rename from 
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21TableSinkITCase.java
rename to 
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22TableSinkITCase.java
index b040476f4..9aa830096 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21TableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22TableSinkITCase.java
@@ -17,5 +17,5 @@
 
 package org.apache.fluss.flink.sink;
 
-/** IT case for {@link FlinkTableSink} in Flink 2.1. */
-public class Flink21TableSinkITCase extends FlinkTableSinkITCase {}
+/** IT case for {@link FlinkTableSink} in Flink 2.2. */
+public class Flink22TableSinkITCase extends FlinkTableSinkITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceBatchITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java
similarity index 88%
rename from 
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceBatchITCase.java
rename to 
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java
index a881b6b31..f65966ea6 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceBatchITCase.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java
@@ -17,5 +17,5 @@
 
 package org.apache.fluss.flink.source;
 
-/** IT case for batch source in Flink 2.1. */
-public class Flink21TableSourceBatchITCase extends FlinkTableSourceBatchITCase 
{}
+/** IT case for batch source in Flink 2.2. */
+public class Flink22TableSourceBatchITCase extends FlinkTableSourceBatchITCase 
{}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceFailOverITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceFailOverITCase.java
similarity index 87%
rename from 
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceFailOverITCase.java
rename to 
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceFailOverITCase.java
index 630470f28..d909bc536 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceFailOverITCase.java
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceFailOverITCase.java
@@ -17,5 +17,5 @@
 
 package org.apache.fluss.flink.source;
 
-/** IT case for source failover and recovery in Flink 2.1. */
-public class Flink21TableSourceFailOverITCase extends 
FlinkTableSourceFailOverITCase {}
+/** IT case for source failover and recovery in Flink 2.2. */
+public class Flink22TableSourceFailOverITCase extends 
FlinkTableSourceFailOverITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
new file mode 100644
index 000000000..725372048
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for {@link FlinkTableSource} in Flink 2.2. */
+public class Flink22TableSourceITCase extends FlinkTableSourceITCase {
+
+    @Test
+    void testDeltaJoin() throws Exception {
+        // start two jobs for this test: one for DML involving the delta join, 
and the other for DQL
+        // to query the results of the sink table
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+
+        String leftTableName = "left_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " b1 varchar, "
+                                + " c1 bigint, "
+                                + " d1 int, "
+                                + " e1 bigint, "
+                                + " primary key (c1, d1) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c1', "
+                                // currently, delta join only support 
append-only source
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        leftTableName));
+        List<InternalRow> rows1 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v2", 200L, 2, 20000L),
+                        row(3, "v1", 300L, 3, 30000L),
+                        row(4, "v4", 400L, 4, 40000L));
+        // write records
+        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+        writeRows(conn, leftTablePath, rows1, false);
+
+        String rightTableName = "right_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " a2 int, "
+                                + " b2 varchar, "
+                                + " c2 bigint, "
+                                + " d2 int, "
+                                + " e2 bigint, "
+                                + " primary key (c2, d2) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c2', "
+                                // currently, delta join only support 
append-only source
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        rightTableName));
+        List<InternalRow> rows2 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v3", 200L, 2, 20000L),
+                        row(3, "v4", 300L, 4, 30000L),
+                        row(4, "v4", 500L, 4, 50000L));
+        // write records
+        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+        writeRows(conn, rightTablePath, rows2, false);
+
+        String sinkTableName = "sink_table";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " b1 varchar, "
+                                + " c1 bigint, "
+                                + " d1 int, "
+                                + " e1 bigint, "
+                                + " a2 int, "
+                                + " b2 varchar, "
+                                + " c2 bigint, "
+                                + " d2 int, "
+                                + " e2 bigint, "
+                                + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss' "
+                                + ")",
+                        sinkTableName));
+
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+                        OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+        String sql =
+                String.format(
+                        "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 = 
c2 AND d1 = d2",
+                        sinkTableName, leftTableName, rightTableName);
+
+        assertThat(tEnv.explainSql(sql))
+                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
+
+        tEnv.executeSql(sql);
+
+        CloseableIterator<Row> collected =
+                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+                        "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+                        "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+                        "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+                        "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+                        "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]");
+        assertResultsIgnoreOrder(collected, expected, true);
+    }
+
+    @Test
+    void testDeltaJoinWithProjectionAndFilter() throws Exception {
+        // start two jobs for this test: one for DML involving the delta join, 
and the other for DQL
+        // to query the results of the sink table
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+
+        String leftTableName = "left_table_proj";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " b1 varchar, "
+                                + " c1 bigint, "
+                                + " d1 int, "
+                                + " e1 bigint, "
+                                + " primary key (c1, d1) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c1', "
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        leftTableName));
+        List<InternalRow> rows1 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v2", 200L, 2, 20000L),
+                        row(3, "v1", 300L, 3, 30000L));
+        TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+        writeRows(conn, leftTablePath, rows1, false);
+
+        String rightTableName = "right_table_proj";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " a2 int, "
+                                + " b2 varchar, "
+                                + " c2 bigint, "
+                                + " d2 int, "
+                                + " e2 bigint, "
+                                + " primary key (c2, d2) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c2', "
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        rightTableName));
+        List<InternalRow> rows2 =
+                Arrays.asList(
+                        row(1, "v1", 100L, 1, 10000L),
+                        row(2, "v3", 200L, 2, 20000L),
+                        row(3, "v4", 300L, 4, 30000L));
+        TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+        writeRows(conn, rightTablePath, rows2, false);
+
+        String sinkTableName = "sink_table_proj";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " c1 bigint, "
+                                + " a2 int, "
+                                + " primary key (c1) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss' "
+                                + ")",
+                        sinkTableName));
+
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+                        OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+        // Test with projection and filter
+        String sql =
+                String.format(
+                        "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN 
%s ON c1 = c2 AND d1 = d2 WHERE a1 > 1",
+                        sinkTableName, leftTableName, rightTableName);
+
+        assertThat(tEnv.explainSql(sql))
+                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
+
+        tEnv.executeSql(sql);
+
+        CloseableIterator<Row> collected =
+                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
+        List<String> expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200, 
2]", "+U[2, 200, 2]");
+        assertResultsIgnoreOrder(collected, expected, true);
+    }
+
+    @Test
+    void testDeltaJoinWithLookupCache() throws Exception {
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+
+        String leftTableName = "left_table_cache";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " c1 bigint, "
+                                + " d1 int, "
+                                + " primary key (c1, d1) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c1', "
+                                + " 'table.merge-engine' = 'first_row' "
+                                + ")",
+                        leftTableName));
+        List<InternalRow> rows1 = Arrays.asList(row(1, 100L, 1));
+        writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false);
+
+        String rightTableName = "right_table_cache";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " a2 int, "
+                                + " c2 bigint, "
+                                + " d2 int, "
+                                + " primary key (c2, d2) NOT ENFORCED"
+                                + ") with ("
+                                + " 'connector' = 'fluss', "
+                                + " 'bucket.key' = 'c2', "
+                                + " 'table.merge-engine' = 'first_row', "
+                                + " 'lookup.cache' = 'partial', "
+                                + " 'lookup.partial-cache.max-rows' = '100' "
+                                + ")",
+                        rightTableName));
+        List<InternalRow> rows2 = Arrays.asList(row(1, 100L, 1));
+        writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2, 
false);
+
+        String sinkTableName = "sink_table_cache";
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ( "
+                                + " a1 int, "
+                                + " a2 int, "
+                                + " primary key (a1) NOT ENFORCED" // Dummy PK
+                                + ") with ("
+                                + " 'connector' = 'fluss' "
+                                + ")",
+                        sinkTableName));
+
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+                        OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+        String sql =
+                String.format(
+                        "INSERT INTO %s SELECT T1.a1, T2.a2 FROM %s AS T1 
INNER JOIN %s AS T2 ON T1.c1 = T2.c2 AND T1.d1 = T2.d2",
+                        sinkTableName, leftTableName, rightTableName);
+
+        assertThat(tEnv.explainSql(sql))
+                .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2) 
AND (d1 = d2))]");
+
+        tEnv.executeSql(sql);
+
+        CloseableIterator<Row> collected =
+                tEnv.executeSql(String.format("select * from %s", 
sinkTableName)).collect();
+        List<String> expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1, 
1]");
+        assertResultsIgnoreOrder(collected, expected, true);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
 
b/fluss-flink/fluss-flink-2.2/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
similarity index 100%
rename from 
fluss-flink/fluss-flink-2.1/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
rename to 
fluss-flink/fluss-flink-2.2/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/resources/log4j2-test.properties 
b/fluss-flink/fluss-flink-2.2/src/test/resources/log4j2-test.properties
similarity index 100%
rename from 
fluss-flink/fluss-flink-2.1/src/test/resources/log4j2-test.properties
rename to fluss-flink/fluss-flink-2.2/src/test/resources/log4j2-test.properties
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java
similarity index 55%
rename from 
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java
rename to 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java
index f3922bcbd..ad48cc99e 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java
@@ -17,5 +17,27 @@
 
 package org.apache.fluss.flink.adapter;
 
-/** Test for {@link MultipleParameterToolAdapter} in flink 2.1. */
-public class Flink21MultipleParameterToolTest extends 
FlinkMultipleParameterToolTest {}
+import org.apache.flink.table.api.Schema;
+
+import java.util.List;
+
+/**
+ * An adapter for the schema with Index.
+ *
+ * <p>TODO: remove this class when no longer support all the Flink 1.x series.
+ */
+public class SchemaAdapter {
+    private SchemaAdapter() {}
+
+    public static Schema withIndex(Schema unresolvedSchema, List<List<String>> 
indexes) {
+        Schema.Builder newSchemaBuilder = 
Schema.newBuilder().fromSchema(unresolvedSchema);
+        if (!indexes.isEmpty()) {
+            throw new UnsupportedOperationException("Index is not supported.");
+        }
+        return newSchemaBuilder.build();
+    }
+
+    public static boolean supportIndex() {
+        return false;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
index 72a4540ea..d5a4ff0aa 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
 /**
  * Adapter for {@link SingleThreadMultiplexSourceReaderBase}.TODO: remove it 
until not supported in
  * flink 1.18.
+ *
+ * <p>TODO: remove this class when no longer support all the Flink 1.x series.
  */
 public abstract class SingleThreadMultiplexSourceReaderBaseAdapter<
                 E, T, SplitT extends SourceSplit, SplitStateT>
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
new file mode 100644
index 000000000..5f7e9e76c
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import java.io.IOException;
+
+/**
+ * Flink sink adapter which hide the different version of createWriter method.
+ *
+ * <p>TODO: remove this class when no longer support all the Flink 1.x series.
+ */
+public abstract class SinkAdapter<InputT> implements Sink<InputT> {
+
+    @Override
+    public SinkWriter<InputT> createWriter(InitContext initContext) throws 
IOException {
+        return createWriter(initContext.getMailboxExecutor(), 
initContext.metricGroup());
+    }
+
+    @Override
+    public SinkWriter<InputT> createWriter(WriterInitContext 
writerInitContext) throws IOException {
+        return createWriter(
+                writerInitContext.getMailboxExecutor(), 
writerInitContext.metricGroup());
+    }
+
+    protected abstract SinkWriter<InputT> createWriter(
+            MailboxExecutor mailboxExecutor, SinkWriterMetricGroup 
metricGroup);
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index fb7a41157..37e9f7c5e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -38,6 +38,7 @@ import org.apache.fluss.utils.ExceptionUtils;
 import org.apache.fluss.utils.IOUtils;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
@@ -86,6 +87,8 @@ import java.util.stream.Collectors;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
 import static 
org.apache.fluss.flink.FlinkConnectorOptions.ALTER_DISALLOW_OPTIONS;
+import static org.apache.fluss.flink.adapter.SchemaAdapter.supportIndex;
+import static org.apache.fluss.flink.adapter.SchemaAdapter.withIndex;
 import static 
org.apache.fluss.flink.utils.CatalogExceptionUtils.isPartitionAlreadyExists;
 import static 
org.apache.fluss.flink.utils.CatalogExceptionUtils.isPartitionInvalid;
 import static 
org.apache.fluss.flink.utils.CatalogExceptionUtils.isPartitionNotExist;
@@ -344,7 +347,11 @@ public class FlinkCatalog extends AbstractCatalog {
                 }
             }
             if (CatalogBaseTable.TableKind.TABLE == 
catalogBaseTable.getTableKind()) {
-                return ((CatalogTable) catalogBaseTable).copy(newOptions);
+                CatalogTable table = ((CatalogTable) 
catalogBaseTable).copy(newOptions);
+                if (supportIndex()) {
+                    table = wrapWithIndexes(table, tableInfo);
+                }
+                return table;
             } else if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE
                     == catalogBaseTable.getTableKind()) {
                 return ((CatalogMaterializedTable) 
catalogBaseTable).copy(newOptions);
@@ -806,4 +813,55 @@ public class FlinkCatalog extends AbstractCatalog {
         }
         return lakeCatalogProperties;
     }
+
+    private CatalogTable wrapWithIndexes(CatalogTable table, TableInfo 
tableInfo) {
+
+        Optional<Schema.UnresolvedPrimaryKey> pkOp = 
table.getUnresolvedSchema().getPrimaryKey();
+        // If there is no pk, return directly.
+        if (!pkOp.isPresent()) {
+            return table;
+        }
+
+        List<List<String>> indexes = new ArrayList<>();
+        // Pk is always an index.
+        indexes.add(pkOp.get().getColumnNames());
+
+        // Judge whether we can do prefix lookup.
+        List<String> bucketKeys = tableInfo.getBucketKeys();
+        // For partition table, the physical primary key is the primary key 
that excludes the
+        // partition key
+        List<String> physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys();
+        List<String> indexKeys = new ArrayList<>();
+        if (isPrefixList(physicalPrimaryKeys, bucketKeys)) {
+            indexKeys.addAll(bucketKeys);
+            if (tableInfo.isPartitioned()) {
+                indexKeys.addAll(tableInfo.getPartitionKeys());
+            }
+        }
+
+        if (!indexKeys.isEmpty()) {
+            indexes.add(indexKeys);
+        }
+        return CatalogTable.newBuilder()
+                .schema(withIndex(table.getUnresolvedSchema(), indexes))
+                .comment(table.getComment())
+                .partitionKeys(table.getPartitionKeys())
+                .options(table.getOptions())
+                .snapshot(table.getSnapshot().orElse(null))
+                .distribution(table.getDistribution().orElse(null))
+                .build();
+    }
+
+    private static boolean isPrefixList(List<String> fullList, List<String> 
prefixList) {
+        if (fullList.size() <= prefixList.size()) {
+            return false;
+        }
+
+        for (int i = 0; i < prefixList.size(); i++) {
+            if (!fullList.get(i).equals(prefixList.get(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
index a35d48117..fcdf9ab42 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
@@ -19,6 +19,7 @@ package org.apache.fluss.flink.sink;
 
 import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.adapter.SinkAdapter;
 import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema;
 import org.apache.fluss.flink.sink.writer.AppendSinkWriter;
 import org.apache.fluss.flink.sink.writer.FlinkSinkWriter;
@@ -27,9 +28,8 @@ import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.TablePath;
 
 import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
 import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -37,7 +37,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
@@ -45,7 +44,7 @@ import static 
org.apache.fluss.flink.sink.FlinkStreamPartitioner.partition;
 import static org.apache.fluss.flink.utils.FlinkConversions.toFlussRowType;
 
 /** Flink sink for Fluss. */
-class FlinkSink<InputT> implements Sink<InputT>, 
SupportsPreWriteTopology<InputT> {
+class FlinkSink<InputT> extends SinkAdapter<InputT> implements 
SupportsPreWriteTopology<InputT> {
 
     private static final long serialVersionUID = 1L;
 
@@ -55,20 +54,11 @@ class FlinkSink<InputT> implements Sink<InputT>, 
SupportsPreWriteTopology<InputT
         this.builder = builder;
     }
 
-    @Deprecated
     @Override
-    public SinkWriter<InputT> createWriter(InitContext context) throws 
IOException {
-        FlinkSinkWriter<InputT> flinkSinkWriter =
-                builder.createWriter(context.getMailboxExecutor());
-        
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup()));
-        return flinkSinkWriter;
-    }
-
-    @Override
-    public SinkWriter<InputT> createWriter(WriterInitContext context) throws 
IOException {
-        FlinkSinkWriter<InputT> flinkSinkWriter =
-                builder.createWriter(context.getMailboxExecutor());
-        
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup()));
+    protected SinkWriter<InputT> createWriter(
+            MailboxExecutor mailboxExecutor, SinkWriterMetricGroup 
metricGroup) {
+        FlinkSinkWriter<InputT> flinkSinkWriter = 
builder.createWriter(mailboxExecutor);
+        
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(metricGroup));
         return flinkSinkWriter;
     }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java
new file mode 100644
index 000000000..7a018bc02
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+
+/**
+ * Adapter for {@link ResolvedCatalogMaterializedTable} because the 
constructor is compatibility in
+ * flink 2.2. However, this constructor only used in test.
+ *
+ * <p>TODO: remove it until <a 
href="https://issues.apache.org/jira/browse/FLINK-38532";>...</a> is
+ * fixed.
+ */
+public class ResolvedCatalogMaterializedTableAdapter {
+
+    public static ResolvedCatalogMaterializedTable create(
+            CatalogMaterializedTable origin,
+            ResolvedSchema resolvedSchema,
+            CatalogMaterializedTable.RefreshMode refreshMode,
+            IntervalFreshness freshness) {
+        return new ResolvedCatalogMaterializedTable(origin, resolvedSchema);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index 9b915a74d..6c65544c4 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -22,6 +22,7 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.IllegalConfigurationException;
 import org.apache.fluss.exception.InvalidPartitionException;
 import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.flink.adapter.ResolvedCatalogMaterializedTableAdapter;
 import org.apache.fluss.flink.lake.LakeFlinkCatalog;
 import org.apache.fluss.flink.utils.FlinkConversionsTest;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
@@ -40,7 +41,6 @@ import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableChange;
@@ -158,7 +158,11 @@ class FlinkCatalogTest {
                         .refreshMode(refreshMode)
                         
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
                         .build();
-        return new ResolvedCatalogMaterializedTable(origin, resolvedSchema);
+        return ResolvedCatalogMaterializedTableAdapter.create(
+                origin,
+                resolvedSchema,
+                refreshMode,
+                IntervalFreshness.of("5", IntervalFreshness.TimeUnit.SECOND));
     }
 
     protected FlinkCatalog initCatalog(
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
index 8c0adb3a2..03f853f61 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
@@ -359,11 +359,7 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
         String query = String.format("SELECT COUNT(*) FROM %s", tableName);
         assertThat(tEnv.explainSql(query))
                 .contains(
-                        String.format(
-                                "TableSourceScan(table=[[testcatalog, 
defaultdb, %s, project=[id], "
-                                        + "aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], "
-                                        + "fields=[count1$0])",
-                                tableName));
+                        "aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]");
         CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
         List<String> collected = collectRowsWithTimeout(iterRows, 1);
         List<String> expected = 
Collections.singletonList(String.format("+I[%s]", expectedRows));
@@ -479,7 +475,7 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
         return tableName;
     }
 
-    private String preparePartitionedLogTable() throws Exception {
+    protected String preparePartitionedLogTable() throws Exception {
         String tableName = String.format("test_partitioned_log_table_%s", 
RandomUtils.nextInt());
         tEnv.executeSql(
                 String.format(
diff --git a/fluss-flink/pom.xml b/fluss-flink/pom.xml
index 8bd4c11ff..d759f5356 100644
--- a/fluss-flink/pom.xml
+++ b/fluss-flink/pom.xml
@@ -36,7 +36,7 @@
         <module>fluss-flink-1.20</module>
         <module>fluss-flink-1.19</module>
         <module>fluss-flink-1.18</module>
-        <module>fluss-flink-2.1</module>
+        <module>fluss-flink-2.2</module>
         <module>fluss-flink-tiering</module>
     </modules>
 
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index f82aa3321..7f0a4cc9b 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -70,11 +70,12 @@
 
         <dependency>
             <groupId>org.apache.fluss</groupId>
-            <artifactId>fluss-flink-2.1</artifactId>
+            <artifactId>fluss-flink-2.2</artifactId>
             <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
 
+
         <dependency>
             <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-flink-1.20</artifactId>

Reply via email to