This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 684a43de2 [flink] Support Flink 2.2 and remove Flink 2.1 (#2167)
684a43de2 is described below
commit 684a43de2388b28e0cc7a427471e3e17423b1c66
Author: vamossagar12 <[email protected]>
AuthorDate: Wed Dec 24 16:48:38 2025 +0530
[flink] Support Flink 2.2 and remove Flink 2.1 (#2167)
---------
Co-authored-by: Hongshun Wang <[email protected]>
Co-authored-by: Jark Wu <[email protected]>
---
.github/workflows/stage.sh | 2 +-
.../apache/fluss/flink/adapter/SinkAdapter.java} | 29 +-
.../org/apache/flink/api/connector/sink2/Sink.java | 61 ----
.../apache/fluss/flink/catalog/Flink21Catalog.java | 136 ---------
.../fluss/flink/catalog/Flink21CatalogFactory.java | 35 ---
.../flink/source/Flink21TableSourceITCase.java | 147 ----------
.../{fluss-flink-2.1 => fluss-flink-2.2}/pom.xml | 22 +-
.../adapter/MultipleParameterToolAdapter.java | 0
.../apache/fluss/flink/adapter/SchemaAdapter.java} | 28 +-
.../apache/fluss/flink/adapter/SinkAdapter.java} | 31 +--
.../org.apache.flink.table.factories.Factory | 2 +-
.../adapter/Flink22MultipleParameterToolTest.java} | 4 +-
.../ResolvedCatalogMaterializedTableAdapter.java | 41 +++
.../fluss/flink/catalog/Flink22CatalogITCase.java} | 23 +-
.../fluss/flink/catalog/Flink22CatalogTest.java} | 22 +-
.../catalog/Flink22MaterializedTableITCase.java} | 4 +-
.../fluss/flink/metrics/Flink22MetricsITCase.java} | 4 +-
.../flink/procedure/Flink22ProcedureITCase.java} | 4 +-
.../security/acl/Flink22AuthorizationITCase.java} | 4 +-
.../flink/sink/Flink22ComplexTypeITCase.java} | 4 +-
.../fluss/flink/sink/Flink22TableSinkITCase.java} | 4 +-
.../source/Flink22TableSourceBatchITCase.java} | 4 +-
.../source/Flink22TableSourceFailOverITCase.java} | 4 +-
.../flink/source/Flink22TableSourceITCase.java | 309 +++++++++++++++++++++
.../org.junit.jupiter.api.extension.Extension | 0
.../src/test/resources/log4j2-test.properties | 0
.../apache/fluss/flink/adapter/SchemaAdapter.java} | 26 +-
...ngleThreadMultiplexSourceReaderBaseAdapter.java | 2 +
.../apache/fluss/flink/adapter/SinkAdapter.java | 48 ++++
.../apache/fluss/flink/catalog/FlinkCatalog.java | 60 +++-
.../org/apache/fluss/flink/sink/FlinkSink.java | 24 +-
.../ResolvedCatalogMaterializedTableAdapter.java | 41 +++
.../fluss/flink/catalog/FlinkCatalogTest.java | 8 +-
.../flink/source/FlinkTableSourceBatchITCase.java | 8 +-
fluss-flink/pom.xml | 2 +-
fluss-test-coverage/pom.xml | 3 +-
36 files changed, 635 insertions(+), 511 deletions(-)
diff --git a/.github/workflows/stage.sh b/.github/workflows/stage.sh
index 49a3ed57e..18587dc29 100755
--- a/.github/workflows/stage.sh
+++ b/.github/workflows/stage.sh
@@ -24,7 +24,7 @@ STAGE_LAKE="lake"
MODULES_FLINK="\
fluss-flink,\
fluss-flink/fluss-flink-common,\
-fluss-flink/fluss-flink-2.1,\
+fluss-flink/fluss-flink-2.2,\
fluss-flink/fluss-flink-1.20,\
"
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
similarity index 54%
copy from
fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
copy to
fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
index 076dcb86c..575bfdfbd 100644
---
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
+++
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
@@ -17,30 +17,25 @@
package org.apache.fluss.flink.adapter;
-import org.apache.flink.util.MultipleParameterTool;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import java.util.Map;
+import java.io.IOException;
/**
- * An adapter for Flink {@link MultipleParameterTool} class. The {@link
MultipleParameterTool} is
- * moved to a new package since Flink 2.x, so this adapter helps to bridge
compatibility for
- * different Flink versions.
+ * Flink sink adapter which hide the different version of createWriter method.
*
* <p>TODO: remove this class when no longer support all the Flink 1.x series.
*/
-public class MultipleParameterToolAdapter {
+public abstract class SinkAdapter<InputT> implements Sink<InputT> {
- private MultipleParameterToolAdapter() {}
-
- private MultipleParameterTool multipleParameterTool;
-
- public static MultipleParameterToolAdapter fromArgs(String[] args) {
- MultipleParameterToolAdapter adapter = new
MultipleParameterToolAdapter();
- adapter.multipleParameterTool = MultipleParameterTool.fromArgs(args);
- return adapter;
+ @Override
+ public SinkWriter<InputT> createWriter(InitContext initContext) throws
IOException {
+ return createWriter(initContext.getMailboxExecutor(),
initContext.metricGroup());
}
- public Map<String, String> toMap() {
- return this.multipleParameterTool.toMap();
- }
+ protected abstract SinkWriter<InputT> createWriter(
+ MailboxExecutor mailboxExecutor, SinkWriterMetricGroup
metricGroup);
}
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
deleted file mode 100644
index 8cab1cb53..000000000
---
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * Placeholder class to resolve compatibility issues. This placeholder class
can be removed once we
- * drop the support of Flink v1.18 which requires the legacy InitContext
interface.
- */
-public interface Sink<InputT> extends Serializable {
-
- /**
- * Creates a {@link SinkWriter}.
- *
- * @param context the runtime context.
- * @return A sink writer.
- * @throws IOException for any failure during creation.
- */
- SinkWriter<InputT> createWriter(WriterInitContext context) throws
IOException;
-
- /** The interface exposes some runtime info for creating a {@link
SinkWriter}. */
- interface InitContext {
-
- /**
- * Returns the mailbox executor that allows to execute {@link
Runnable}s inside the task
- * thread in between record processing.
- *
- * <p>Note that this method should not be used per-record for
performance reasons in the
- * same way as records should not be sent to the external system
individually. Rather,
- * implementers are expected to batch records and only enqueue a
single {@link Runnable} per
- * batch to handle the result.
- */
- MailboxExecutor getMailboxExecutor();
-
- /**
- * @return The metric group this writer belongs to.
- */
- SinkWriterMetricGroup metricGroup();
- }
-}
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
deleted file mode 100644
index d14e3af6f..000000000
---
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.catalog;
-
-import org.apache.fluss.flink.lake.LakeFlinkCatalog;
-import org.apache.fluss.metadata.TableInfo;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Supplier;
-
-/** A {@link FlinkCatalog} used for Flink 2.1. */
-public class Flink21Catalog extends FlinkCatalog {
-
- public Flink21Catalog(
- String name,
- String defaultDatabase,
- String bootstrapServers,
- ClassLoader classLoader,
- Map<String, String> securityConfigs,
- Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
- super(
- name,
- defaultDatabase,
- bootstrapServers,
- classLoader,
- securityConfigs,
- lakeCatalogPropertiesSupplier);
- }
-
- @VisibleForTesting
- public Flink21Catalog(
- String name,
- String defaultDatabase,
- String bootstrapServers,
- ClassLoader classLoader,
- Map<String, String> securityConfigs,
- Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
- LakeFlinkCatalog lakeFlinkCatalog) {
- super(
- name,
- defaultDatabase,
- bootstrapServers,
- classLoader,
- securityConfigs,
- lakeCatalogPropertiesSupplier,
- lakeFlinkCatalog);
- }
-
- @Override
- public CatalogBaseTable getTable(ObjectPath objectPath)
- throws TableNotExistException, CatalogException {
- CatalogBaseTable catalogBaseTable = super.getTable(objectPath);
- if (!(catalogBaseTable instanceof CatalogTable)
- || objectPath.getObjectName().contains(LAKE_TABLE_SPLITTER)) {
- return catalogBaseTable;
- }
-
- CatalogTable table = (CatalogTable) catalogBaseTable;
- Optional<Schema.UnresolvedPrimaryKey> pkOp =
table.getUnresolvedSchema().getPrimaryKey();
- // If there is no pk, return directly.
- if (pkOp.isEmpty()) {
- return table;
- }
-
- Schema.Builder newSchemaBuilder =
- Schema.newBuilder().fromSchema(table.getUnresolvedSchema());
- // Pk is always an index.
- newSchemaBuilder.index(pkOp.get().getColumnNames());
-
- // Judge whether we can do prefix lookup.
- TableInfo tableInfo =
connection.getTable(toTablePath(objectPath)).getTableInfo();
- List<String> bucketKeys = tableInfo.getBucketKeys();
- // For partition table, the physical primary key is the primary key
that excludes the
- // partition key
- List<String> physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys();
- List<String> indexKeys = new ArrayList<>();
- if (isPrefixList(physicalPrimaryKeys, bucketKeys)) {
- indexKeys.addAll(bucketKeys);
- if (tableInfo.isPartitioned()) {
- indexKeys.addAll(tableInfo.getPartitionKeys());
- }
- }
-
- if (!indexKeys.isEmpty()) {
- newSchemaBuilder.index(indexKeys);
- }
- return CatalogTable.newBuilder()
- .schema(newSchemaBuilder.build())
- .comment(table.getComment())
- .partitionKeys(table.getPartitionKeys())
- .options(table.getOptions())
- .snapshot(table.getSnapshot().orElse(null))
- .distribution(table.getDistribution().orElse(null))
- .build();
- }
-
- private static boolean isPrefixList(List<String> fullList, List<String>
prefixList) {
- if (fullList.size() <= prefixList.size()) {
- return false;
- }
-
- for (int i = 0; i < prefixList.size(); i++) {
- if (!fullList.get(i).equals(prefixList.get(i))) {
- return false;
- }
- }
- return true;
- }
-}
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
deleted file mode 100644
index cff44ab86..000000000
---
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.catalog;
-
-/** A {@link FlinkCatalogFactory} used for Flink 2.1. */
-public class Flink21CatalogFactory extends FlinkCatalogFactory {
-
- @Override
- public FlinkCatalog createCatalog(Context context) {
- FlinkCatalog catalog = super.createCatalog(context);
- return new Flink21Catalog(
- catalog.catalogName,
- catalog.defaultDatabase,
- catalog.bootstrapServers,
- catalog.classLoader,
- catalog.securityConfigs,
- catalog.lakeCatalogPropertiesSupplier);
- }
-}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
deleted file mode 100644
index 9b1e908da..000000000
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceITCase.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.source;
-
-import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.row.InternalRow;
-
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
-import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
-import static org.apache.fluss.testutils.DataTestUtils.row;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** IT case for {@link FlinkTableSource} in Flink 2.1. */
-public class Flink21TableSourceITCase extends FlinkTableSourceITCase {
-
- @Test
- void testDeltaJoin() throws Exception {
- // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
- // to query the results of the sink table
-
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
-
- String leftTableName = "left_table";
- tEnv.executeSql(
- String.format(
- "create table %s ( "
- + " a1 int, "
- + " b1 varchar, "
- + " c1 bigint, "
- + " d1 int, "
- + " e1 bigint, "
- + " primary key (c1, d1) NOT ENFORCED"
- + ") with ("
- + " 'connector' = 'fluss', "
- + " 'bucket.key' = 'c1', "
- // currently, delta join only support
append-only source
- + " 'table.merge-engine' = 'first_row' "
- + ")",
- leftTableName));
- List<InternalRow> rows1 =
- Arrays.asList(
- row(1, "v1", 100L, 1, 10000L),
- row(2, "v2", 200L, 2, 20000L),
- row(3, "v1", 300L, 3, 30000L),
- row(4, "v4", 400L, 4, 40000L));
- // write records
- TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
- writeRows(conn, leftTablePath, rows1, false);
-
- String rightTableName = "right_table";
- tEnv.executeSql(
- String.format(
- "create table %s ("
- + " a2 int, "
- + " b2 varchar, "
- + " c2 bigint, "
- + " d2 int, "
- + " e2 bigint, "
- + " primary key (c2, d2) NOT ENFORCED"
- + ") with ("
- + " 'connector' = 'fluss', "
- + " 'bucket.key' = 'c2', "
- // currently, delta join only support
append-only source
- + " 'table.merge-engine' = 'first_row' "
- + ")",
- rightTableName));
- List<InternalRow> rows2 =
- Arrays.asList(
- row(1, "v1", 100L, 1, 10000L),
- row(2, "v3", 200L, 2, 20000L),
- row(3, "v4", 300L, 4, 30000L),
- row(4, "v4", 500L, 4, 50000L));
- // write records
- TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
- writeRows(conn, rightTablePath, rows2, false);
-
- String sinkTableName = "sink_table";
- tEnv.executeSql(
- String.format(
- "create table %s ( "
- + " a1 int, "
- + " b1 varchar, "
- + " c1 bigint, "
- + " d1 int, "
- + " e1 bigint, "
- + " a2 int, "
- + " b2 varchar, "
- + " c2 bigint, "
- + " d2 int, "
- + " e2 bigint, "
- + " primary key (c1, d1, c2, d2) NOT ENFORCED"
- + ") with ("
- + " 'connector' = 'fluss' "
- + ")",
- sinkTableName));
-
- tEnv.getConfig()
- .set(
-
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
- OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
-
- String sql =
- String.format(
- "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2",
- sinkTableName, leftTableName, rightTableName);
-
- assertThat(tEnv.explainSql(sql))
- .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
-
- tEnv.executeSql(sql);
-
- CloseableIterator<Row> collected =
- tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
- List<String> expected =
- Arrays.asList(
- "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
- "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
- "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
- "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
- "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
- "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]");
- assertResultsIgnoreOrder(collected, expected, true);
- }
-}
diff --git a/fluss-flink/fluss-flink-2.1/pom.xml
b/fluss-flink/fluss-flink-2.2/pom.xml
similarity index 93%
rename from fluss-flink/fluss-flink-2.1/pom.xml
rename to fluss-flink/fluss-flink-2.2/pom.xml
index 5b5475f3d..c6527e282 100644
--- a/fluss-flink/fluss-flink-2.1/pom.xml
+++ b/fluss-flink/fluss-flink-2.2/pom.xml
@@ -26,11 +26,11 @@
<version>0.9-SNAPSHOT</version>
</parent>
- <artifactId>fluss-flink-2.1</artifactId>
- <name>Fluss : Engine Flink : 2.1 </name>
+ <artifactId>fluss-flink-2.2</artifactId>
+ <name>Fluss : Engine Flink : 2.2 </name>
<properties>
- <flink.major.version>2.1</flink.major.version>
- <flink.minor.version>2.1.1</flink.minor.version>
+ <flink.major.version>2.2</flink.major.version>
+ <flink.minor.version>2.2.0</flink.minor.version>
</properties>
<dependencies>
@@ -68,6 +68,20 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.minor.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.minor.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- test dependency -->
<dependency>
<groupId>org.apache.fluss</groupId>
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
similarity index 100%
copy from
fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
copy to
fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
diff --git
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java
similarity index 53%
rename from
fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
rename to
fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java
index 6caa32d7c..8f6492bd1 100644
---
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
+++
b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java
@@ -15,7 +15,29 @@
* limitations under the License.
*/
-package org.apache.flink.api.connector.sink2;
+package org.apache.fluss.flink.adapter;
-/** Placeholder class to resolve compatibility issues. */
-public interface WriterInitContext extends Sink.InitContext {}
+import org.apache.flink.table.api.Schema;
+
+import java.util.List;
+
+/**
+ * An adapter for the schema with Index.
+ *
+ * <p>TODO: remove this class when no longer support all the Flink 1.x series.
+ */
+public class SchemaAdapter {
+ private SchemaAdapter() {}
+
+ public static Schema withIndex(Schema unresolvedSchema, List<List<String>>
indexes) {
+ Schema.Builder newSchemaBuilder =
Schema.newBuilder().fromSchema(unresolvedSchema);
+ for (List<String> index : indexes) {
+ newSchemaBuilder.index(index);
+ }
+ return newSchemaBuilder.build();
+ }
+
+ public static boolean supportIndex() {
+ return true;
+ }
+}
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
similarity index 52%
rename from
fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
rename to
fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
index 076dcb86c..8d6e51250 100644
---
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java
+++
b/fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
@@ -17,30 +17,27 @@
package org.apache.fluss.flink.adapter;
-import org.apache.flink.util.MultipleParameterTool;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import java.util.Map;
+import java.io.IOException;
/**
- * An adapter for Flink {@link MultipleParameterTool} class. The {@link
MultipleParameterTool} is
- * moved to a new package since Flink 2.x, so this adapter helps to bridge
compatibility for
- * different Flink versions.
+ * Flink sink adapter which hide the different version of createWriter method.
*
* <p>TODO: remove this class when no longer support all the Flink 1.x series.
*/
-public class MultipleParameterToolAdapter {
+public abstract class SinkAdapter<InputT> implements Sink<InputT> {
- private MultipleParameterToolAdapter() {}
-
- private MultipleParameterTool multipleParameterTool;
-
- public static MultipleParameterToolAdapter fromArgs(String[] args) {
- MultipleParameterToolAdapter adapter = new
MultipleParameterToolAdapter();
- adapter.multipleParameterTool = MultipleParameterTool.fromArgs(args);
- return adapter;
+ @Override
+ public SinkWriter<InputT> createWriter(WriterInitContext
writerInitContext) throws IOException {
+ return createWriter(
+ writerInitContext.getMailboxExecutor(),
writerInitContext.metricGroup());
}
- public Map<String, String> toMap() {
- return this.multipleParameterTool.toMap();
- }
+ protected abstract SinkWriter<InputT> createWriter(
+ MailboxExecutor mailboxExecutor, SinkWriterMetricGroup
metricGroup);
}
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/fluss-flink/fluss-flink-2.2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 93%
rename from
fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
rename to
fluss-flink/fluss-flink-2.2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index f13f71331..d5aca2d53 100644
---
a/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/fluss-flink/fluss-flink-2.2/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -16,4 +16,4 @@
# limitations under the License.
#
-org.apache.fluss.flink.catalog.Flink21CatalogFactory
\ No newline at end of file
+org.apache.fluss.flink.catalog.FlinkCatalogFactory
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/Flink22MultipleParameterToolTest.java
similarity index 87%
copy from
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java
copy to
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/Flink22MultipleParameterToolTest.java
index f3922bcbd..4d06fe901 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/Flink22MultipleParameterToolTest.java
@@ -17,5 +17,5 @@
package org.apache.fluss.flink.adapter;
-/** Test for {@link MultipleParameterToolAdapter} in flink 2.1. */
-public class Flink21MultipleParameterToolTest extends
FlinkMultipleParameterToolTest {}
+/** Test for {@link MultipleParameterToolAdapter} in flink 2.2. */
+public class Flink22MultipleParameterToolTest extends
FlinkMultipleParameterToolTest {}
diff --git
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java
new file mode 100644
index 000000000..f868a84db
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+
+/**
+ * Adapter for {@link ResolvedCatalogMaterializedTable} because the
constructor is compatibility in
+ * flink 2.2. However, this constructor only used in test.
+ *
+ * <p>TODO: remove it until <a
href="https://issues.apache.org/jira/browse/FLINK-38532">...</a> is
+ * fixed.
+ */
+public class ResolvedCatalogMaterializedTableAdapter {
+
+ public static ResolvedCatalogMaterializedTable create(
+ CatalogMaterializedTable origin,
+ ResolvedSchema resolvedSchema,
+ CatalogMaterializedTable.RefreshMode refreshMode,
+ IntervalFreshness freshness) {
+ return new ResolvedCatalogMaterializedTable(origin, resolvedSchema,
refreshMode, freshness);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java
similarity index 91%
rename from
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
rename to
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java
index 62bf5b9aa..b87965727 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogITCase.java
@@ -21,31 +21,12 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
-/** IT case for catalog in Flink 2.1. */
-public class Flink21CatalogITCase extends FlinkCatalogITCase {
-
- @BeforeAll
- static void beforeAll() {
- FlinkCatalogITCase.beforeAll();
-
- // close the old one and open a new one later
- catalog.close();
-
- catalog =
- new Flink21Catalog(
- catalog.catalogName,
- catalog.defaultDatabase,
- catalog.bootstrapServers,
- catalog.classLoader,
- catalog.securityConfigs,
- catalog.lakeCatalogPropertiesSupplier);
- catalog.open();
- }
+/** IT case for catalog in Flink 2.2. */
+public class Flink22CatalogITCase extends FlinkCatalogITCase {
@Test
void testGetTableWithIndex() throws Exception {
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java
similarity index 73%
rename from
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
rename to
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java
index d6aa6ef56..bff2b77d6 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22CatalogTest.java
@@ -17,8 +17,6 @@
package org.apache.fluss.flink.catalog;
-import org.apache.fluss.flink.lake.LakeFlinkCatalog;
-
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.DefaultIndex;
@@ -28,24 +26,8 @@ import org.apache.flink.table.catalog.UniqueConstraint;
import java.util.Arrays;
import java.util.Collections;
-/** Test for {@link Flink21Catalog}. */
-public class FlinkCatalog21Test extends FlinkCatalogTest {
-
- @Override
- protected FlinkCatalog initCatalog(
- String catalogName,
- String databaseName,
- String bootstrapServers,
- LakeFlinkCatalog lakeFlinkCatalog) {
- return new Flink21Catalog(
- catalogName,
- databaseName,
- bootstrapServers,
- Thread.currentThread().getContextClassLoader(),
- Collections.emptyMap(),
- Collections::emptyMap,
- lakeFlinkCatalog);
- }
+/** Test for {@link FlinkCatalog}. */
+public class Flink22CatalogTest extends FlinkCatalogTest {
protected ResolvedSchema createSchema() {
return new ResolvedSchema(
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21MaterializedTableITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22MaterializedTableITCase.java
similarity index 88%
rename from
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21MaterializedTableITCase.java
rename to
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22MaterializedTableITCase.java
index ee3041996..36240466c 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21MaterializedTableITCase.java
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/catalog/Flink22MaterializedTableITCase.java
@@ -17,5 +17,5 @@
package org.apache.fluss.flink.catalog;
-/** IT case for materialized table in Flink 2.1. */
-public class Flink21MaterializedTableITCase extends MaterializedTableITCase {}
+/** IT case for materialized table in Flink 2.2. */
+public class Flink22MaterializedTableITCase extends MaterializedTableITCase {}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/metrics/Flink21MetricsITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/metrics/Flink22MetricsITCase.java
similarity index 88%
rename from
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/metrics/Flink21MetricsITCase.java
rename to
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/metrics/Flink22MetricsITCase.java
index 961c69d2f..37d795e0b 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/metrics/Flink21MetricsITCase.java
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/metrics/Flink22MetricsITCase.java
@@ -17,5 +17,5 @@
package org.apache.fluss.flink.metrics;
-/** IT case for metrics in Flink 2.1. */
-public class Flink21MetricsITCase extends FlinkMetricsITCase {}
+/** IT case for metrics in Flink 2.2. */
+public class Flink22MetricsITCase extends FlinkMetricsITCase {}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/procedure/Flink21ProcedureITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/procedure/Flink22ProcedureITCase.java
similarity index 88%
rename from
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/procedure/Flink21ProcedureITCase.java
rename to
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/procedure/Flink22ProcedureITCase.java
index c95c47f0f..84612dc94 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/procedure/Flink21ProcedureITCase.java
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/procedure/Flink22ProcedureITCase.java
@@ -17,5 +17,5 @@
package org.apache.fluss.flink.procedure;
-/** IT case for procedure in Flink 2.1. */
-public class Flink21ProcedureITCase extends FlinkProcedureITCase {}
+/** IT case for procedure in Flink 2.2. */
+public class Flink22ProcedureITCase extends FlinkProcedureITCase {}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/security/acl/Flink21AuthorizationITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/security/acl/Flink22AuthorizationITCase.java
similarity index 88%
rename from
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/security/acl/Flink21AuthorizationITCase.java
rename to
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/security/acl/Flink22AuthorizationITCase.java
index 66aefab8e..192be6b90 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/security/acl/Flink21AuthorizationITCase.java
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/security/acl/Flink22AuthorizationITCase.java
@@ -17,5 +17,5 @@
package org.apache.fluss.flink.security.acl;
-/** IT case for authorization in Flink 2.1. */
-public class Flink21AuthorizationITCase extends FlinkAuthorizationITCase {}
+/** IT case for authorization in Flink 2.2. */
+public class Flink22AuthorizationITCase extends FlinkAuthorizationITCase {}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22ComplexTypeITCase.java
similarity index 87%
rename from
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java
rename to
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22ComplexTypeITCase.java
index 7b2ed44b2..cbb1f9a96 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22ComplexTypeITCase.java
@@ -17,5 +17,5 @@
package org.apache.fluss.flink.sink;
-/** Integration tests for Array type support in Flink 2.1. */
-public class Flink21ComplexTypeITCase extends FlinkComplexTypeITCase {}
+/** Integration tests for Array type support in Flink 2.2. */
+public class Flink22ComplexTypeITCase extends FlinkComplexTypeITCase {}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21TableSinkITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22TableSinkITCase.java
similarity index 87%
rename from
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21TableSinkITCase.java
rename to
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22TableSinkITCase.java
index b040476f4..9aa830096 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21TableSinkITCase.java
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/sink/Flink22TableSinkITCase.java
@@ -17,5 +17,5 @@
package org.apache.fluss.flink.sink;
-/** IT case for {@link FlinkTableSink} in Flink 2.1. */
-public class Flink21TableSinkITCase extends FlinkTableSinkITCase {}
+/** IT case for {@link FlinkTableSink} in Flink 2.2. */
+public class Flink22TableSinkITCase extends FlinkTableSinkITCase {}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceBatchITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java
similarity index 88%
rename from
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceBatchITCase.java
rename to
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java
index a881b6b31..f65966ea6 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceBatchITCase.java
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceBatchITCase.java
@@ -17,5 +17,5 @@
package org.apache.fluss.flink.source;
-/** IT case for batch source in Flink 2.1. */
-public class Flink21TableSourceBatchITCase extends FlinkTableSourceBatchITCase
{}
+/** IT case for batch source in Flink 2.2. */
+public class Flink22TableSourceBatchITCase extends FlinkTableSourceBatchITCase
{}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceFailOverITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceFailOverITCase.java
similarity index 87%
rename from
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceFailOverITCase.java
rename to
fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceFailOverITCase.java
index 630470f28..d909bc536 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/source/Flink21TableSourceFailOverITCase.java
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceFailOverITCase.java
@@ -17,5 +17,5 @@
package org.apache.fluss.flink.source;
-/** IT case for source failover and recovery in Flink 2.1. */
-public class Flink21TableSourceFailOverITCase extends
FlinkTableSourceFailOverITCase {}
+/** IT case for source failover and recovery in Flink 2.2. */
+public class Flink22TableSourceFailOverITCase extends
FlinkTableSourceFailOverITCase {}
diff --git
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
new file mode 100644
index 000000000..725372048
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22TableSourceITCase.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for {@link FlinkTableSource} in Flink 2.2. */
+public class Flink22TableSourceITCase extends FlinkTableSourceITCase {
+
+ @Test
+ void testDeltaJoin() throws Exception {
+ // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
+ // to query the results of the sink table
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ // currently, delta join only support
append-only source
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v1", 300L, 3, 30000L),
+ row(4, "v4", 400L, 4, 40000L));
+ // write records
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ // currently, delta join only support
append-only source
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v3", 200L, 2, 20000L),
+ row(3, "v4", 300L, 4, 30000L),
+ row(4, "v4", 500L, 4, 50000L));
+ // write records
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c1, d1, c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT * FROM %s INNER JOIN %s ON c1 =
c2 AND d1 = d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "-U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "+U[1, v1, 100, 1, 10000, 1, v1, 100, 1, 10000]",
+ "+I[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+ "-U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]",
+ "+U[2, v2, 200, 2, 20000, 2, v3, 200, 2, 20000]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinWithProjectionAndFilter() throws Exception {
+ // start two jobs for this test: one for DML involving the delta join,
and the other for DQL
+ // to query the results of the sink table
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_proj";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " b1 varchar, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " e1 bigint, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v2", 200L, 2, 20000L),
+ row(3, "v1", 300L, 3, 30000L));
+ TablePath leftTablePath = TablePath.of(DEFAULT_DB, leftTableName);
+ writeRows(conn, leftTablePath, rows1, false);
+
+ String rightTableName = "right_table_proj";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " b2 varchar, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " e2 bigint, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 =
+ Arrays.asList(
+ row(1, "v1", 100L, 1, 10000L),
+ row(2, "v3", 200L, 2, 20000L),
+ row(3, "v4", 300L, 4, 30000L));
+ TablePath rightTablePath = TablePath.of(DEFAULT_DB, rightTableName);
+ writeRows(conn, rightTablePath, rows2, false);
+
+ String sinkTableName = "sink_table_proj";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " a2 int, "
+ + " primary key (c1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ // Test with projection and filter
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT a1, c1, a2 FROM %s INNER JOIN
%s ON c1 = c2 AND d1 = d2 WHERE a1 > 1",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected = Arrays.asList("+I[2, 200, 2]", "-U[2, 200,
2]", "+U[2, 200, 2]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+
+ @Test
+ void testDeltaJoinWithLookupCache() throws Exception {
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ String leftTableName = "left_table_cache";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " c1 bigint, "
+ + " d1 int, "
+ + " primary key (c1, d1) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c1', "
+ + " 'table.merge-engine' = 'first_row' "
+ + ")",
+ leftTableName));
+ List<InternalRow> rows1 = Arrays.asList(row(1, 100L, 1));
+ writeRows(conn, TablePath.of(DEFAULT_DB, leftTableName), rows1, false);
+
+ String rightTableName = "right_table_cache";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " a2 int, "
+ + " c2 bigint, "
+ + " d2 int, "
+ + " primary key (c2, d2) NOT ENFORCED"
+ + ") with ("
+ + " 'connector' = 'fluss', "
+ + " 'bucket.key' = 'c2', "
+ + " 'table.merge-engine' = 'first_row', "
+ + " 'lookup.cache' = 'partial', "
+ + " 'lookup.partial-cache.max-rows' = '100' "
+ + ")",
+ rightTableName));
+ List<InternalRow> rows2 = Arrays.asList(row(1, 100L, 1));
+ writeRows(conn, TablePath.of(DEFAULT_DB, rightTableName), rows2,
false);
+
+ String sinkTableName = "sink_table_cache";
+ tEnv.executeSql(
+ String.format(
+ "create table %s ( "
+ + " a1 int, "
+ + " a2 int, "
+ + " primary key (a1) NOT ENFORCED" // Dummy PK
+ + ") with ("
+ + " 'connector' = 'fluss' "
+ + ")",
+ sinkTableName));
+
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY,
+ OptimizerConfigOptions.DeltaJoinStrategy.FORCE);
+
+ String sql =
+ String.format(
+ "INSERT INTO %s SELECT T1.a1, T2.a2 FROM %s AS T1
INNER JOIN %s AS T2 ON T1.c1 = T2.c2 AND T1.d1 = T2.d2",
+ sinkTableName, leftTableName, rightTableName);
+
+ assertThat(tEnv.explainSql(sql))
+ .contains("DeltaJoin(joinType=[InnerJoin], where=[((c1 = c2)
AND (d1 = d2))]");
+
+ tEnv.executeSql(sql);
+
+ CloseableIterator<Row> collected =
+ tEnv.executeSql(String.format("select * from %s",
sinkTableName)).collect();
+ List<String> expected = Arrays.asList("+I[1, 1]", "-U[1, 1]", "+U[1,
1]");
+ assertResultsIgnoreOrder(collected, expected, true);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
b/fluss-flink/fluss-flink-2.2/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
similarity index 100%
rename from
fluss-flink/fluss-flink-2.1/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
rename to
fluss-flink/fluss-flink-2.2/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/resources/log4j2-test.properties
b/fluss-flink/fluss-flink-2.2/src/test/resources/log4j2-test.properties
similarity index 100%
rename from
fluss-flink/fluss-flink-2.1/src/test/resources/log4j2-test.properties
rename to fluss-flink/fluss-flink-2.2/src/test/resources/log4j2-test.properties
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java
similarity index 55%
rename from
fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java
rename to
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java
index f3922bcbd..ad48cc99e 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/adapter/Flink21MultipleParameterToolTest.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SchemaAdapter.java
@@ -17,5 +17,27 @@
package org.apache.fluss.flink.adapter;
-/** Test for {@link MultipleParameterToolAdapter} in flink 2.1. */
-public class Flink21MultipleParameterToolTest extends
FlinkMultipleParameterToolTest {}
+import org.apache.flink.table.api.Schema;
+
+import java.util.List;
+
+/**
+ * An adapter for the schema with Index.
+ *
+ * <p>TODO: remove this class when no longer support all the Flink 1.x series.
+ */
+public class SchemaAdapter {
+ private SchemaAdapter() {}
+
+ public static Schema withIndex(Schema unresolvedSchema, List<List<String>>
indexes) {
+ Schema.Builder newSchemaBuilder =
Schema.newBuilder().fromSchema(unresolvedSchema);
+ if (!indexes.isEmpty()) {
+ throw new UnsupportedOperationException("Index is not supported.");
+ }
+ return newSchemaBuilder.build();
+ }
+
+ public static boolean supportIndex() {
+ return false;
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
index 72a4540ea..d5a4ff0aa 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
@@ -29,6 +29,8 @@ import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
/**
* Adapter for {@link SingleThreadMultiplexSourceReaderBase}.TODO: remove it
until not supported in
* flink 1.18.
+ *
+ * <p>TODO: remove this class when no longer support all the Flink 1.x series.
*/
public abstract class SingleThreadMultiplexSourceReaderBaseAdapter<
E, T, SplitT extends SourceSplit, SplitStateT>
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
new file mode 100644
index 000000000..5f7e9e76c
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import java.io.IOException;
+
+/**
+ * Flink sink adapter which hide the different version of createWriter method.
+ *
+ * <p>TODO: remove this class when no longer support all the Flink 1.x series.
+ */
+public abstract class SinkAdapter<InputT> implements Sink<InputT> {
+
+ @Override
+ public SinkWriter<InputT> createWriter(InitContext initContext) throws
IOException {
+ return createWriter(initContext.getMailboxExecutor(),
initContext.metricGroup());
+ }
+
+ @Override
+ public SinkWriter<InputT> createWriter(WriterInitContext
writerInitContext) throws IOException {
+ return createWriter(
+ writerInitContext.getMailboxExecutor(),
writerInitContext.metricGroup());
+ }
+
+ protected abstract SinkWriter<InputT> createWriter(
+ MailboxExecutor mailboxExecutor, SinkWriterMetricGroup
metricGroup);
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index fb7a41157..37e9f7c5e 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -38,6 +38,7 @@ import org.apache.fluss.utils.ExceptionUtils;
import org.apache.fluss.utils.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
@@ -86,6 +87,8 @@ import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
import static
org.apache.fluss.flink.FlinkConnectorOptions.ALTER_DISALLOW_OPTIONS;
+import static org.apache.fluss.flink.adapter.SchemaAdapter.supportIndex;
+import static org.apache.fluss.flink.adapter.SchemaAdapter.withIndex;
import static
org.apache.fluss.flink.utils.CatalogExceptionUtils.isPartitionAlreadyExists;
import static
org.apache.fluss.flink.utils.CatalogExceptionUtils.isPartitionInvalid;
import static
org.apache.fluss.flink.utils.CatalogExceptionUtils.isPartitionNotExist;
@@ -344,7 +347,11 @@ public class FlinkCatalog extends AbstractCatalog {
}
}
if (CatalogBaseTable.TableKind.TABLE ==
catalogBaseTable.getTableKind()) {
- return ((CatalogTable) catalogBaseTable).copy(newOptions);
+ CatalogTable table = ((CatalogTable)
catalogBaseTable).copy(newOptions);
+ if (supportIndex()) {
+ table = wrapWithIndexes(table, tableInfo);
+ }
+ return table;
} else if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE
== catalogBaseTable.getTableKind()) {
return ((CatalogMaterializedTable)
catalogBaseTable).copy(newOptions);
@@ -806,4 +813,55 @@ public class FlinkCatalog extends AbstractCatalog {
}
return lakeCatalogProperties;
}
+
+ private CatalogTable wrapWithIndexes(CatalogTable table, TableInfo
tableInfo) {
+
+ Optional<Schema.UnresolvedPrimaryKey> pkOp =
table.getUnresolvedSchema().getPrimaryKey();
+ // If there is no pk, return directly.
+ if (!pkOp.isPresent()) {
+ return table;
+ }
+
+ List<List<String>> indexes = new ArrayList<>();
+ // Pk is always an index.
+ indexes.add(pkOp.get().getColumnNames());
+
+ // Judge whether we can do prefix lookup.
+ List<String> bucketKeys = tableInfo.getBucketKeys();
+ // For partition table, the physical primary key is the primary key
that excludes the
+ // partition key
+ List<String> physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys();
+ List<String> indexKeys = new ArrayList<>();
+ if (isPrefixList(physicalPrimaryKeys, bucketKeys)) {
+ indexKeys.addAll(bucketKeys);
+ if (tableInfo.isPartitioned()) {
+ indexKeys.addAll(tableInfo.getPartitionKeys());
+ }
+ }
+
+ if (!indexKeys.isEmpty()) {
+ indexes.add(indexKeys);
+ }
+ return CatalogTable.newBuilder()
+ .schema(withIndex(table.getUnresolvedSchema(), indexes))
+ .comment(table.getComment())
+ .partitionKeys(table.getPartitionKeys())
+ .options(table.getOptions())
+ .snapshot(table.getSnapshot().orElse(null))
+ .distribution(table.getDistribution().orElse(null))
+ .build();
+ }
+
+ private static boolean isPrefixList(List<String> fullList, List<String>
prefixList) {
+ if (fullList.size() <= prefixList.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < prefixList.size(); i++) {
+ if (!fullList.get(i).equals(prefixList.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
index a35d48117..fcdf9ab42 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
@@ -19,6 +19,7 @@ package org.apache.fluss.flink.sink;
import org.apache.fluss.annotation.Internal;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.adapter.SinkAdapter;
import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema;
import org.apache.fluss.flink.sink.writer.AppendSinkWriter;
import org.apache.fluss.flink.sink.writer.FlinkSinkWriter;
@@ -27,9 +28,8 @@ import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.TablePath;
import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -37,7 +37,6 @@ import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.io.Serializable;
import java.util.List;
@@ -45,7 +44,7 @@ import static
org.apache.fluss.flink.sink.FlinkStreamPartitioner.partition;
import static org.apache.fluss.flink.utils.FlinkConversions.toFlussRowType;
/** Flink sink for Fluss. */
-class FlinkSink<InputT> implements Sink<InputT>,
SupportsPreWriteTopology<InputT> {
+class FlinkSink<InputT> extends SinkAdapter<InputT> implements
SupportsPreWriteTopology<InputT> {
private static final long serialVersionUID = 1L;
@@ -55,20 +54,11 @@ class FlinkSink<InputT> implements Sink<InputT>,
SupportsPreWriteTopology<InputT
this.builder = builder;
}
- @Deprecated
@Override
- public SinkWriter<InputT> createWriter(InitContext context) throws
IOException {
- FlinkSinkWriter<InputT> flinkSinkWriter =
- builder.createWriter(context.getMailboxExecutor());
-
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup()));
- return flinkSinkWriter;
- }
-
- @Override
- public SinkWriter<InputT> createWriter(WriterInitContext context) throws
IOException {
- FlinkSinkWriter<InputT> flinkSinkWriter =
- builder.createWriter(context.getMailboxExecutor());
-
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup()));
+ protected SinkWriter<InputT> createWriter(
+ MailboxExecutor mailboxExecutor, SinkWriterMetricGroup
metricGroup) {
+ FlinkSinkWriter<InputT> flinkSinkWriter =
builder.createWriter(mailboxExecutor);
+
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(metricGroup));
return flinkSinkWriter;
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java
new file mode 100644
index 000000000..7a018bc02
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/ResolvedCatalogMaterializedTableAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.adapter;
+
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+
+/**
+ * Adapter for {@link ResolvedCatalogMaterializedTable} because the
constructor is compatibility in
+ * flink 2.2. However, this constructor only used in test.
+ *
+ * <p>TODO: remove it until <a
href="https://issues.apache.org/jira/browse/FLINK-38532">...</a> is
+ * fixed.
+ */
+public class ResolvedCatalogMaterializedTableAdapter {
+
+ public static ResolvedCatalogMaterializedTable create(
+ CatalogMaterializedTable origin,
+ ResolvedSchema resolvedSchema,
+ CatalogMaterializedTable.RefreshMode refreshMode,
+ IntervalFreshness freshness) {
+ return new ResolvedCatalogMaterializedTable(origin, resolvedSchema);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index 9b915a74d..6c65544c4 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -22,6 +22,7 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.IllegalConfigurationException;
import org.apache.fluss.exception.InvalidPartitionException;
import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.flink.adapter.ResolvedCatalogMaterializedTableAdapter;
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
import org.apache.fluss.flink.utils.FlinkConversionsTest;
import org.apache.fluss.server.testutils.FlussClusterExtension;
@@ -40,7 +41,6 @@ import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableChange;
@@ -158,7 +158,11 @@ class FlinkCatalogTest {
.refreshMode(refreshMode)
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
.build();
- return new ResolvedCatalogMaterializedTable(origin, resolvedSchema);
+ return ResolvedCatalogMaterializedTableAdapter.create(
+ origin,
+ resolvedSchema,
+ refreshMode,
+ IntervalFreshness.of("5", IntervalFreshness.TimeUnit.SECOND));
}
protected FlinkCatalog initCatalog(
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
index 8c0adb3a2..03f853f61 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
@@ -359,11 +359,7 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
String query = String.format("SELECT COUNT(*) FROM %s", tableName);
assertThat(tEnv.explainSql(query))
.contains(
- String.format(
- "TableSourceScan(table=[[testcatalog,
defaultdb, %s, project=[id], "
- + "aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], "
- + "fields=[count1$0])",
- tableName));
+ "aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]");
CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
List<String> collected = collectRowsWithTimeout(iterRows, 1);
List<String> expected =
Collections.singletonList(String.format("+I[%s]", expectedRows));
@@ -479,7 +475,7 @@ abstract class FlinkTableSourceBatchITCase extends
FlinkTestBase {
return tableName;
}
- private String preparePartitionedLogTable() throws Exception {
+ protected String preparePartitionedLogTable() throws Exception {
String tableName = String.format("test_partitioned_log_table_%s",
RandomUtils.nextInt());
tEnv.executeSql(
String.format(
diff --git a/fluss-flink/pom.xml b/fluss-flink/pom.xml
index 8bd4c11ff..d759f5356 100644
--- a/fluss-flink/pom.xml
+++ b/fluss-flink/pom.xml
@@ -36,7 +36,7 @@
<module>fluss-flink-1.20</module>
<module>fluss-flink-1.19</module>
<module>fluss-flink-1.18</module>
- <module>fluss-flink-2.1</module>
+ <module>fluss-flink-2.2</module>
<module>fluss-flink-tiering</module>
</modules>
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index f82aa3321..7f0a4cc9b 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -70,11 +70,12 @@
<dependency>
<groupId>org.apache.fluss</groupId>
- <artifactId>fluss-flink-2.1</artifactId>
+ <artifactId>fluss-flink-2.2</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
+
<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-flink-1.20</artifactId>