This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cf28445c76 [INLONG-8358][Sort] Add kafka connector on flink 1.15
(#8713)
cf28445c76 is described below
commit cf28445c7627be0ffbe627f10b274bca3476011a
Author: Hao <[email protected]>
AuthorDate: Tue Dec 12 11:03:36 2023 +0800
[INLONG-8358][Sort] Add kafka connector on flink 1.15 (#8713)
---
.../src/main/assemblies/sort-connectors-v1.15.xml | 8 +
inlong-sort/sort-core/pom.xml | 6 +
.../sort-end-to-end-tests-v1.15/pom.xml | 14 +-
...MysqlToRocksITCase.java => KafkaE2EITCase.java} | 122 +++--
.../inlong/sort/tests/MysqlToRocksITCase.java | 13 +-
.../test/resources/env/kafka_test_kafka_init.txt | 1 +
.../src/test/resources/flinkSql/kafka_test.sql | 61 +++
.../sort-flink-v1.15/sort-connectors/kafka/pom.xml | 179 +++++++
.../org/apache/inlong/sort/kafka/KafkaOptions.java | 50 ++
.../kafka/table/KafkaConnectorOptionsUtil.java | 585 +++++++++++++++++++++
.../sort/kafka/table/KafkaDynamicTableFactory.java | 442 ++++++++++++++++
.../table/UpsertKafkaDynamicTableFactory.java | 416 +++++++++++++++
.../org.apache.flink.table.factories.Factory | 17 +
.../sort-flink-v1.15/sort-connectors/pom.xml | 1 +
licenses/inlong-sort-connectors/LICENSE | 6 +
pom.xml | 20 +-
16 files changed, 1900 insertions(+), 41 deletions(-)
diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
index 855b66f858..1df930a929 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
@@ -105,6 +105,14 @@
<includes>
<include>sort-connector-hudi-v1.15-${project.version}.jar</include>
</includes>
+ <fileMode>0644</fileMode>
+ </fileSet>
+ <fileSet>
+
<directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/target</directory>
+ <outputDirectory>inlong-sort/connectors</outputDirectory>
+ <includes>
+
<include>sort-connector-kafka-v1.15-${project.version}.jar</include>
+ </includes>
<fileMode>0644</fileMode>
</fileSet>
</fileSets>
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index a18d177315..e8974e29f0 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -299,6 +299,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-kafka-v1.15</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
index 07d8663682..abdb6e6500 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
@@ -46,12 +46,14 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
- <version>${testcontainers.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
- <version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
@@ -223,6 +225,14 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
+ <artifactItem>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-kafka-v1.15</artifactId>
+ <version>${project.version}</version>
+
<destFileName>sort-connector-kafka.jar</destFileName>
+ <type>jar</type>
+
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </artifactItem>
</artifactItems>
</configuration>
<executions>
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java
similarity index 53%
copy from
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
copy to
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java
index bbeccd04a5..1399fe2f6f 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.tests;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.MySqlContainer;
+import org.apache.inlong.sort.tests.utils.PlaceholderResolver;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;
@@ -29,9 +30,17 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+import java.io.IOException;
+import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
@@ -40,7 +49,10 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import static
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
import static
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
@@ -49,28 +61,42 @@ import static
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRock
import static
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
/**
- * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
- * Test flink sql Mysql cdc to StarRocks
+ * End-to-end tests for sort-connector-kafka uber jar.
*/
-public class MysqlToRocksITCase extends FlinkContainerTestEnv {
+public class KafkaE2EITCase extends FlinkContainerTestEnv {
- private static final Logger LOG =
LoggerFactory.getLogger(MysqlToRocksITCase.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaE2EITCase.class);
+ public static final Logger MYSQL_LOG =
LoggerFactory.getLogger(MySqlContainer.class);
+
+ public static final Logger KAFKA_LOG =
LoggerFactory.getLogger(KafkaContainer.class);
+
+ private static final Path kafkaJar =
TestUtils.getResource("sort-connector-kafka.jar");
private static final Path mysqlJar =
TestUtils.getResource("sort-connector-mysql-cdc.jar");
- private static final Path jdbcJar =
TestUtils.getResource("sort-connector-starrocks.jar");
+ private static final Path starrocksJar =
TestUtils.getResource("sort-connector-starrocks.jar");
private static final Path mysqlJdbcJar =
TestUtils.getResource("mysql-driver.jar");
+
private static final String sqlFile;
static {
try {
- sqlFile =
-
Paths.get(MysqlToRocksITCase.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
+ URI kafkaSqlFile =
+
Objects.requireNonNull(KafkaE2EITCase.class.getResource("/flinkSql/kafka_test.sql")).toURI();
+ sqlFile = Paths.get(kafkaSqlFile).toString();
buildStarRocksImage();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
+ @ClassRule
+ public static final KafkaContainer KAFKA =
+ new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("kafka")
+ .withEmbeddedZookeeper()
+ .withLogConsumer(new Slf4jLogConsumer(KAFKA_LOG));
+
@ClassRule
public static StarRocksContainer STAR_ROCKS =
(StarRocksContainer) new
StarRocksContainer(getNewStarRocksImageName())
@@ -86,7 +112,7 @@ public class MysqlToRocksITCase extends
FlinkContainerTestEnv {
.withDatabaseName("test")
.withNetwork(NETWORK)
.withNetworkAliases("mysql")
- .withLogConsumer(new Slf4jLogConsumer(LOG));
+ .withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG));
@Before
public void setup() {
@@ -103,7 +129,7 @@ public class MysqlToRocksITCase extends
FlinkContainerTestEnv {
MYSQL_CONTAINER.getPassword());
Statement stat = conn.createStatement();
stat.execute(
- "CREATE TABLE test_input1 (\n"
+ "CREATE TABLE test_input (\n"
+ " id SERIAL,\n"
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+ " description VARCHAR(512),\n"
@@ -118,49 +144,83 @@ public class MysqlToRocksITCase extends
FlinkContainerTestEnv {
@AfterClass
public static void teardown() {
+ if (KAFKA != null) {
+ KAFKA.stop();
+ }
+
if (MYSQL_CONTAINER != null) {
MYSQL_CONTAINER.stop();
}
+
if (STAR_ROCKS != null) {
STAR_ROCKS.stop();
}
}
+ private void initializeKafkaTable(String topic) {
+ String fileName = "kafka_test_kafka_init.txt";
+ int port = KafkaContainer.ZOOKEEPER_PORT;
+
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("TOPIC", topic);
+ properties.put("ZOOKEEPER_PORT", port);
+
+ try {
+ String createKafkaStatement = getCreateStatement(fileName,
properties);
+ ExecResult result = KAFKA.execInContainer("bash", "-c",
createKafkaStatement);
+ LOG.info("Create kafka topic: {}, std: {}", createKafkaStatement,
result.getStdout());
+ if (result.getExitCode() != 0) {
+ throw new RuntimeException("Init kafka topic failed. Exit
code:" + result.getExitCode());
+ }
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getCreateStatement(String fileName, Map<String, Object>
properties) {
+ URL url =
Objects.requireNonNull(KafkaE2EITCase.class.getResource("/env/" + fileName));
+
+ try {
+ Path file = Paths.get(url.toURI());
+ return PlaceholderResolver.getDefaultResolver().resolveByMap(
+ new String(Files.readAllBytes(file),
StandardCharsets.UTF_8),
+ properties);
+ } catch (IOException | URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
- * Test flink sql postgresql cdc to StarRocks
+ * Test flink sql mysql cdc to starrocks.
*
- * @throws Exception The exception may throws when execute the case
+ * @throws Exception The exception may throw when execute the case
*/
@Test
- public void testMysqlUpdateAndDelete() throws Exception {
- submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlJdbcJar);
+ public void testKafkaWithSqlFile() throws Exception {
+ final String topic = "test-topic";
+ initializeKafkaTable(topic);
+
+ submitSQLJob(sqlFile, kafkaJar, starrocksJar, mysqlJar, mysqlJdbcJar);
waitUntilJobRunning(Duration.ofSeconds(10));
// generate input
- try (Connection conn =
- DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(),
MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
+ try (Connection conn =
DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(), MYSQL_CONTAINER.getPassword());
Statement stat = conn.createStatement()) {
- stat.execute(
- "INSERT INTO test_input1 "
- + "VALUES (1,'jacket','water resistent white wind
breaker');");
- stat.execute(
- "INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel
scooter ');");
- stat.execute(
- "update test_input1 set name = 'tom' where id = 2;");
- stat.execute(
- "delete from test_input1 where id = 1;");
+ stat.execute("INSERT INTO test_input VALUES (1,'jacket','water
resistant white wind breaker');");
+ stat.execute("INSERT INTO test_input VALUES (2,'scooter','Big
2-wheel scooter ');");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
- JdbcProxy proxy =
- new JdbcProxy(STAR_ROCKS.getJdbcUrl(),
STAR_ROCKS.getUsername(),
- STAR_ROCKS.getPassword(),
- STAR_ROCKS.getDriverClassName());
- List<String> expectResult =
- Arrays.asList("2,tom,Big 2-wheel scooter ");
+ JdbcProxy proxy = new JdbcProxy(STAR_ROCKS.getJdbcUrl(),
STAR_ROCKS.getUsername(),
+ STAR_ROCKS.getPassword(),
+ STAR_ROCKS.getDriverClassName());
+
+ List<String> expectResult = Arrays.asList(
+ "1,jacket,water resistant white wind breaker",
+ "2,scooter,Big 2-wheel scooter ");
proxy.checkResultWithTimeout(
expectResult,
"test_output1",
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
index bbeccd04a5..51501772a9 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksITCase.java
@@ -39,7 +39,7 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
@@ -155,12 +155,11 @@ public class MysqlToRocksITCase extends
FlinkContainerTestEnv {
throw e;
}
- JdbcProxy proxy =
- new JdbcProxy(STAR_ROCKS.getJdbcUrl(),
STAR_ROCKS.getUsername(),
- STAR_ROCKS.getPassword(),
- STAR_ROCKS.getDriverClassName());
- List<String> expectResult =
- Arrays.asList("2,tom,Big 2-wheel scooter ");
+ JdbcProxy proxy = new JdbcProxy(STAR_ROCKS.getJdbcUrl(),
STAR_ROCKS.getUsername(),
+ STAR_ROCKS.getPassword(),
+ STAR_ROCKS.getDriverClassName());
+
+ List<String> expectResult = Collections.singletonList("2,tom,Big
2-wheel scooter ");
proxy.checkResultWithTimeout(
expectResult,
"test_output1",
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/kafka_test_kafka_init.txt
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/kafka_test_kafka_init.txt
new file mode 100644
index 0000000000..b2f31d78fa
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/kafka_test_kafka_init.txt
@@ -0,0 +1 @@
+kafka-topics --create --topic ${TOPIC} --replication-factor 1 --partitions 1
--zookeeper localhost:${ZOOKEEPER_PORT}
\ No newline at end of file
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/kafka_test.sql
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/kafka_test.sql
new file mode 100644
index 0000000000..5bda3b9366
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/kafka_test.sql
@@ -0,0 +1,61 @@
+CREATE TABLE test_input (
+ `id` INT primary key,
+ name STRING,
+ description STRING
+) WITH (
+ 'connector' = 'mysql-cdc-inlong',
+ 'hostname' = 'mysql',
+ 'port' = '3306',
+ 'username' = 'root',
+ 'password' = 'inlong',
+ 'database-name' = 'test',
+ 'table-name' = 'test_input',
+ 'scan.incremental.snapshot.enabled' = 'false',
+ 'jdbc.properties.useSSL' = 'false',
+ 'jdbc.properties.allowPublicKeyRetrieval' = 'true'
+);
+
+CREATE TABLE kafka_load (
+ `id` INT NOT NULL primary key,
+ name STRING,
+ description STRING
+) WITH (
+ 'connector' = 'upsert-kafka-inlong',
+ 'topic' = 'test-topic',
+ 'properties.bootstrap.servers' = 'kafka:9092',
+ 'key.format' = 'csv',
+ 'value.format' = 'csv'
+);
+
+CREATE TABLE kafka_extract (
+ `id` INT NOT NULL,
+ name STRING,
+ description STRING
+) WITH (
+ 'connector' = 'kafka-inlong',
+ 'topic' = 'test-topic',
+ 'properties.bootstrap.servers' = 'kafka:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'format' = 'csv'
+);
+
+CREATE TABLE test_output (
+ `id` INT primary key,
+ name STRING,
+ description STRING
+) WITH (
+ 'connector' = 'starrocks-inlong',
+ 'jdbc-url' = 'jdbc:mysql://starrocks:9030',
+ 'load-url'='starrocks:8030',
+ 'database-name'='test',
+ 'table-name' = 'test_output1',
+ 'username' = 'inlong',
+ 'password' = 'inlong',
+ 'sink.properties.format' = 'json',
+ 'sink.properties.strip_outer_array' = 'true',
+ 'sink.buffer-flush.interval-ms' = '1000'
+);
+
+INSERT INTO kafka_load select * from test_input;
+INSERT INTO test_output select * from kafka_extract;
\ No newline at end of file
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
new file mode 100644
index 0000000000..61fcc35247
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
@@ -0,0 +1,179 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connectors-v1.15</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sort-connector-kafka-v1.15</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache InLong - Sort-connector-kafka</name>
+
+ <properties>
+
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>audit-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>org.apache.inlong:*</include>
+ <include>org.apache.kafka:*</include>
+
<include>org.apache.flink:flink-connector-kafka</include>
+
<include>org.apache.flink:flink-connector-base</include>
+ <!-- Include fixed version 18.0-13.0 of
flink shaded guava -->
+
<include>org.apache.flink:flink-shaded-guava</include>
+
<include>org.apache.httpcomponents:*</include>
+
<include>org.apache.commons:commons-lang3</include>
+ <include>com.google.protobuf:*</include>
+ <include>joda-time:*</include>
+
<include>com.fasterxml.jackson.core:*</include>
+ <include>com.amazonaws:*</include>
+ <include>software.amazon.ion:*</include>
+
<include>commons-logging:commons-logging</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>org.apache.kafka:*</artifact>
+ <excludes>
+
<exclude>kafka/kafka-version.properties</exclude>
+ <exclude>LICENSE</exclude>
+ <!-- Does not contain anything
relevant.
+ Cites a binary dependency on
jersey, but this is neither reflected in the
+ dependency graph, nor are any
jersey files bundled. -->
+ <exclude>NOTICE</exclude>
+ <exclude>common/**</exclude>
+ </excludes>
+ </filter>
+ <filter>
+
<artifact>org.apache.inlong:sort-connector-*</artifact>
+ <includes>
+ <include>org/apache/inlong/**</include>
+
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+
<include>META-INF/services/org.apache.flink.table.factories.TableFactory</include>
+ </includes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.kafka</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.kafka</shadedPattern>
+ </relocation>
+ <relocation>
+
<pattern>org.apache.commons.logging</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.commons.logging</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons.lang3</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.commons.lang3</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.http</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>com.google</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.com.google</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.amazonaws</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.com.amazonaws</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>software.amazon.ion</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.software.amazon.ion</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.fasterxml.jackson</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.com.fasterxml.jackson</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.joda.time</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.joda.time</shadedPattern>
+ </relocation>
+
+ <relocation>
+
<pattern>org.apache.inlong.sort.base</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.base</shadedPattern>
+ </relocation>
+ <relocation>
+
<pattern>org.apache.inlong.sort.configuration</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.configuration</shadedPattern>
+ </relocation>
+ <relocation>
+
<pattern>org.apache.inlong.sort.protocol</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.protocol</shadedPattern>
+ </relocation>
+ <relocation>
+
<pattern>org.apache.inlong.sort.schema</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.schema</shadedPattern>
+ </relocation>
+ <relocation>
+
<pattern>org.apache.inlong.sort.util</pattern>
+
<shadedPattern>org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.util</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java
new file mode 100644
index 0000000000..6962eb6948
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+
+ private KafkaOptions() {
+ }
+
+ public static final ConfigOption<String> SINK_MULTIPLE_PARTITION_PATTERN =
+ ConfigOptions.key("sink.multiple.partition-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "option 'sink.multiple.partition-pattern' used
either when the partitioner is raw-hash, or when passing in designated
partition field names for custom field partitions");
+
+ public static final ConfigOption<String> SINK_FIXED_IDENTIFIER =
+ ConfigOptions.key("sink.fixed.identifier")
+ .stringType()
+ .defaultValue("-1");
+
+ //
--------------------------------------------------------------------------------------------
+ // Sink specific options
+ //
--------------------------------------------------------------------------------------------
+ public static final ConfigOption<Boolean> KAFKA_IGNORE_ALL_CHANGELOG =
+ ConfigOptions.key("sink.ignore.changelog")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Regard upsert delete as insert kind.");
+
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
new file mode 100644
index 0000000000..06cf40ea49
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.IntStream;
+
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARTITIONER;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+
+/** Utilities for {@link KafkaConnectorOptions}. */
+@Internal
+class KafkaConnectorOptionsUtil {
+
+ private static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT =
+
ConfigOptions.key("schema-registry.subject").stringType().noDefaultValue();
+
+ //
--------------------------------------------------------------------------------------------
+ // Option enumerations
+ //
--------------------------------------------------------------------------------------------
+
+ // Prefix for Kafka specific properties.
+ public static final String PROPERTIES_PREFIX = "properties.";
+
+ // Sink partitioner.
+ public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default";
+ public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
+ public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN =
"round-robin";
+
+ // Other keywords.
+ private static final String PARTITION = "partition";
+ private static final String OFFSET = "offset";
+ protected static final String AVRO_CONFLUENT = "avro-confluent";
+ protected static final String DEBEZIUM_AVRO_CONFLUENT =
"debezium-avro-confluent";
+ private static final List<String> SCHEMA_REGISTRY_FORMATS =
+ Arrays.asList(AVRO_CONFLUENT, DEBEZIUM_AVRO_CONFLUENT);
+
+ //
--------------------------------------------------------------------------------------------
+ // Validation
+ //
--------------------------------------------------------------------------------------------
+
+ public static void validateTableSourceOptions(ReadableConfig tableOptions)
{
+ validateSourceTopic(tableOptions);
+ validateScanStartupMode(tableOptions);
+ }
+
+ public static void validateTableSinkOptions(ReadableConfig tableOptions) {
+ validateSinkTopic(tableOptions);
+ validateSinkPartitioner(tableOptions);
+ }
+
+ public static void validateSourceTopic(ReadableConfig tableOptions) {
+ Optional<List<String>> topic = tableOptions.getOptional(TOPIC);
+ Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);
+
+ if (topic.isPresent() && pattern.isPresent()) {
+ throw new ValidationException(
+ "Option 'topic' and 'topic-pattern' shouldn't be set
together.");
+ }
+
+ if (!topic.isPresent() && !pattern.isPresent()) {
+ throw new ValidationException("Either 'topic' or 'topic-pattern'
must be set.");
+ }
+ }
+
+ public static void validateSinkTopic(ReadableConfig tableOptions) {
+ String errorMessageTemp =
+ "Flink Kafka sink currently only supports single topic, but
got %s: %s.";
+ if (!isSingleTopic(tableOptions)) {
+ if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
+ throw new ValidationException(
+ String.format(
+ errorMessageTemp,
+ "'topic-pattern'",
+ tableOptions.get(TOPIC_PATTERN)));
+ } else {
+ throw new ValidationException(
+ String.format(errorMessageTemp, "'topic'",
tableOptions.get(TOPIC)));
+ }
+ }
+ }
+
+ private static void validateScanStartupMode(ReadableConfig tableOptions) {
+ tableOptions
+ .getOptional(SCAN_STARTUP_MODE)
+ .ifPresent(
+ mode -> {
+ switch (mode) {
+ case TIMESTAMP:
+ if (!tableOptions
+
.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS)
+ .isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "'%s' is required in
'%s' startup mode"
+ + " but
missing.",
+
SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
+
KafkaConnectorOptions.ScanStartupMode.TIMESTAMP));
+ }
+
+ break;
+ case SPECIFIC_OFFSETS:
+ if (!tableOptions
+
.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS)
+ .isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "'%s' is required in
'%s' startup mode"
+ + " but
missing.",
+
SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
+
KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS));
+ }
+ if (!isSingleTopic(tableOptions)) {
+ throw new ValidationException(
+ "Currently Kafka source only
supports specific offset for single topic.");
+ }
+ String specificOffsets =
+
tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+ parseSpecificOffsets(
+ specificOffsets,
SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+
+ break;
+ }
+ });
+ }
+
+ private static void validateSinkPartitioner(ReadableConfig tableOptions) {
+ tableOptions
+ .getOptional(SINK_PARTITIONER)
+ .ifPresent(
+ partitioner -> {
+ if
(partitioner.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN)
+ &&
tableOptions.getOptional(KEY_FIELDS).isPresent()) {
+ throw new ValidationException(
+ "Currently 'round-robin' partitioner
only works when option 'key.fields' is not specified.");
+ } else if (partitioner.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Option '%s' should be a
non-empty string.",
+ SINK_PARTITIONER.key()));
+ }
+ });
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Utilities
+ //
--------------------------------------------------------------------------------------------
+
+ public static List<String> getSourceTopics(ReadableConfig tableOptions) {
+ return tableOptions.getOptional(TOPIC).orElse(null);
+ }
+
+ public static Pattern getSourceTopicPattern(ReadableConfig tableOptions) {
+ return
tableOptions.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null);
+ }
+
+ private static boolean isSingleTopic(ReadableConfig tableOptions) {
+ // Option 'topic-pattern' is regarded as multi-topics.
+ return tableOptions.getOptional(TOPIC).map(t -> t.size() ==
1).orElse(false);
+ }
+
+ /**
+ * Parses SpecificOffsets String to Map.
+ *
+ * <p>SpecificOffsets String format was given as following:
+ *
+ * <pre>
+ * scan.startup.specific-offsets =
partition:0,offset:42;partition:1,offset:300
+ * </pre>
+ *
+ * @return SpecificOffsets with Map format, key is partition, and value is
offset
+ */
+ public static Map<Integer, Long> parseSpecificOffsets(
+ String specificOffsetsStr, String optionKey) {
+ final Map<Integer, Long> offsetMap = new HashMap<>();
+ final String[] pairs = specificOffsetsStr.split(";");
+ final String validationExceptionMessage =
+ String.format(
+ "Invalid properties '%s' should follow the format "
+ +
"'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
+ optionKey, specificOffsetsStr);
+
+ if (pairs.length == 0) {
+ throw new ValidationException(validationExceptionMessage);
+ }
+
+ for (String pair : pairs) {
+ if (null == pair || pair.length() == 0 || !pair.contains(",")) {
+ throw new ValidationException(validationExceptionMessage);
+ }
+
+ final String[] kv = pair.split(",");
+ if (kv.length != 2
+ || !kv[0].startsWith(PARTITION + ':')
+ || !kv[1].startsWith(OFFSET + ':')) {
+ throw new ValidationException(validationExceptionMessage);
+ }
+
+ String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
+ String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
+ try {
+ final Integer partition = Integer.valueOf(partitionValue);
+ final Long offset = Long.valueOf(offsetValue);
+ offsetMap.put(partition, offset);
+ } catch (NumberFormatException e) {
+ throw new ValidationException(validationExceptionMessage, e);
+ }
+ }
+ return offsetMap;
+ }
+
+ public static StartupOptions getStartupOptions(ReadableConfig
tableOptions) {
+ final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+ final StartupMode startupMode =
+ tableOptions
+ .getOptional(SCAN_STARTUP_MODE)
+ .map(KafkaConnectorOptionsUtil::fromOption)
+ .orElse(StartupMode.GROUP_OFFSETS);
+ if (startupMode == StartupMode.SPECIFIC_OFFSETS) {
+ // It will be refactored after support specific offset for
multiple topics in
+ // FLINK-18602. We have already checked tableOptions.get(TOPIC)
contains one topic in
+ // validateScanStartupMode().
+ buildSpecificOffsets(tableOptions, tableOptions.get(TOPIC).get(0),
specificOffsets);
+ }
+
+ final StartupOptions options = new StartupOptions();
+ options.startupMode = startupMode;
+ options.specificOffsets = specificOffsets;
+ if (startupMode == StartupMode.TIMESTAMP) {
+ options.startupTimestampMillis =
tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
+ }
+ return options;
+ }
+
+ private static void buildSpecificOffsets(
+ ReadableConfig tableOptions,
+ String topic,
+ Map<KafkaTopicPartition, Long> specificOffsets) {
+ String specificOffsetsStrOpt =
tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+ final Map<Integer, Long> offsetMap =
+ parseSpecificOffsets(specificOffsetsStrOpt,
SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+ offsetMap.forEach(
+ (partition, offset) -> {
+ final KafkaTopicPartition topicPartition =
+ new KafkaTopicPartition(topic, partition);
+ specificOffsets.put(topicPartition, offset);
+ });
+ }
+
+ /**
+ * Returns the {@link StartupMode} of Kafka Consumer by passed-in
table-specific {@link
+ * KafkaConnectorOptions.ScanStartupMode}.
+ */
+ private static StartupMode
fromOption(KafkaConnectorOptions.ScanStartupMode scanStartupMode) {
+ switch (scanStartupMode) {
+ case EARLIEST_OFFSET:
+ return StartupMode.EARLIEST;
+ case LATEST_OFFSET:
+ return StartupMode.LATEST;
+ case GROUP_OFFSETS:
+ return StartupMode.GROUP_OFFSETS;
+ case SPECIFIC_OFFSETS:
+ return StartupMode.SPECIFIC_OFFSETS;
+ case TIMESTAMP:
+ return StartupMode.TIMESTAMP;
+
+ default:
+ throw new TableException(
+ "Unsupported startup mode. Validator should have
checked that.");
+ }
+ }
+
+ static void validateDeliveryGuarantee(ReadableConfig tableOptions) {
+ if (tableOptions.get(DELIVERY_GUARANTEE) ==
DeliveryGuarantee.EXACTLY_ONCE
+ &&
!tableOptions.getOptional(TRANSACTIONAL_ID_PREFIX).isPresent()) {
+ throw new ValidationException(
+ TRANSACTIONAL_ID_PREFIX.key()
+ + " must be specified when using
DeliveryGuarantee.EXACTLY_ONCE.");
+ }
+ }
+
+ /**
+ * Creates an array of indices that determine which physical fields of the
table schema to
+ * include in the key format and the order that those fields have in the
key format.
+ *
+ * <p>See {@link KafkaConnectorOptions#KEY_FORMAT}, {@link
KafkaConnectorOptions#KEY_FIELDS},
+ * and {@link KafkaConnectorOptions#KEY_FIELDS_PREFIX} for more
information.
+ */
+ public static int[] createKeyFormatProjection(
+ ReadableConfig options, DataType physicalDataType) {
+ final LogicalType physicalType = physicalDataType.getLogicalType();
+ Preconditions.checkArgument(
+ physicalType.is(LogicalTypeRoot.ROW), "Row data type
expected.");
+ final Optional<String> optionalKeyFormat =
options.getOptional(KEY_FORMAT);
+ final Optional<List<String>> optionalKeyFields =
options.getOptional(KEY_FIELDS);
+
+ if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "The option '%s' can only be declared if a key
format is defined using '%s'.",
+ KEY_FIELDS.key(), KEY_FORMAT.key()));
+ } else if (optionalKeyFormat.isPresent()
+ && (!optionalKeyFields.isPresent() ||
optionalKeyFields.get().size() == 0)) {
+ throw new ValidationException(
+ String.format(
+ "A key format '%s' requires the declaration of one
or more of key fields using '%s'.",
+ KEY_FORMAT.key(), KEY_FIELDS.key()));
+ }
+
+ if (!optionalKeyFormat.isPresent()) {
+ return new int[0];
+ }
+
+ final String keyPrefix =
options.getOptional(KEY_FIELDS_PREFIX).orElse("");
+
+ final List<String> keyFields = optionalKeyFields.get();
+ final List<String> physicalFields =
LogicalTypeChecks.getFieldNames(physicalType);
+ return keyFields.stream()
+ .mapToInt(
+ keyField -> {
+ final int pos = physicalFields.indexOf(keyField);
+ // check that field name exists
+ if (pos < 0) {
+ throw new ValidationException(
+ String.format(
+ "Could not find the field '%s'
in the table schema for usage in the key format. "
+ + "A key field must be
a regular, physical column. "
+ + "The following
columns can be selected in the '%s' option:\n"
+ + "%s",
+ keyField, KEY_FIELDS.key(),
physicalFields));
+ }
+ // check that field name is prefixed correctly
+ if (!keyField.startsWith(keyPrefix)) {
+ throw new ValidationException(
+ String.format(
+ "All fields in '%s' must be
prefixed with '%s' when option '%s' "
+ + "is set but field
'%s' is not prefixed.",
+ KEY_FIELDS.key(),
+ keyPrefix,
+ KEY_FIELDS_PREFIX.key(),
+ keyField));
+ }
+ return pos;
+ })
+ .toArray();
+ }
+
+ /**
+ * Creates an array of indices that determine which physical fields of the
table schema to
+ * include in the value format.
+ *
+ * <p>See {@link KafkaConnectorOptions#VALUE_FORMAT}, {@link
+ * KafkaConnectorOptions#VALUE_FIELDS_INCLUDE}, and {@link
+ * KafkaConnectorOptions#KEY_FIELDS_PREFIX} for more information.
+ */
+ public static int[] createValueFormatProjection(
+ ReadableConfig options, DataType physicalDataType) {
+ final LogicalType physicalType = physicalDataType.getLogicalType();
+ Preconditions.checkArgument(
+ physicalType.is(LogicalTypeRoot.ROW), "Row data type
expected.");
+ final int physicalFieldCount =
LogicalTypeChecks.getFieldCount(physicalType);
+ final IntStream physicalFields = IntStream.range(0,
physicalFieldCount);
+
+ final String keyPrefix =
options.getOptional(KEY_FIELDS_PREFIX).orElse("");
+
+ final KafkaConnectorOptions.ValueFieldsStrategy strategy =
options.get(VALUE_FIELDS_INCLUDE);
+ if (strategy == KafkaConnectorOptions.ValueFieldsStrategy.ALL) {
+ if (keyPrefix.length() > 0) {
+ throw new ValidationException(
+ String.format(
+ "A key prefix is not allowed when option '%s'
is set to '%s'. "
+ + "Set it to '%s' instead to avoid
field overlaps.",
+ VALUE_FIELDS_INCLUDE.key(),
+ KafkaConnectorOptions.ValueFieldsStrategy.ALL,
+
KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY));
+ }
+ return physicalFields.toArray();
+ } else if (strategy ==
KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY) {
+ final int[] keyProjection = createKeyFormatProjection(options,
physicalDataType);
+ return physicalFields
+ .filter(pos -> IntStream.of(keyProjection).noneMatch(k ->
k == pos))
+ .toArray();
+ }
+ throw new TableException("Unknown value fields strategy:" + strategy);
+ }
+
+ /**
+ * Returns a new table context with a default schema registry subject
value in the options if
+ * the format is a schema registry format (e.g. 'avro-confluent') and the
subject is not
+ * defined.
+ */
+ public static DynamicTableFactory.Context
autoCompleteSchemaRegistrySubject(
+ DynamicTableFactory.Context context) {
+ Map<String, String> tableOptions =
context.getCatalogTable().getOptions();
+ Map<String, String> newOptions =
autoCompleteSchemaRegistrySubject(tableOptions);
+ if (newOptions.size() > tableOptions.size()) {
+ // build a new context
+ return new FactoryUtil.DefaultDynamicTableContext(
+ context.getObjectIdentifier(),
+ context.getCatalogTable().copy(newOptions),
+ context.getEnrichmentOptions(),
+ context.getConfiguration(),
+ context.getClassLoader(),
+ context.isTemporary());
+ } else {
+ return context;
+ }
+ }
+
+ private static Map<String, String> autoCompleteSchemaRegistrySubject(
+ Map<String, String> options) {
+ Configuration configuration = Configuration.fromMap(options);
+ // the subject autoComplete should only be used in sink, check the
topic first
+ validateSinkTopic(configuration);
+ final Optional<String> valueFormat =
configuration.getOptional(VALUE_FORMAT);
+ final Optional<String> keyFormat =
configuration.getOptional(KEY_FORMAT);
+ final Optional<String> format = configuration.getOptional(FORMAT);
+ final String topic = configuration.get(TOPIC).get(0);
+
+ if (format.isPresent() &&
SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
+ autoCompleteSubject(configuration, format.get(), topic + "-value");
+ } else if (valueFormat.isPresent() &&
SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
+ autoCompleteSubject(configuration, "value." + valueFormat.get(),
topic + "-value");
+ }
+
+ if (keyFormat.isPresent() &&
SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
+ autoCompleteSubject(configuration, "key." + keyFormat.get(), topic
+ "-key");
+ }
+ return configuration.toMap();
+ }
+
+ private static void autoCompleteSubject(
+ Configuration configuration, String format, String subject) {
+ ConfigOption<String> subjectOption =
+ ConfigOptions.key(format + "." + SCHEMA_REGISTRY_SUBJECT.key())
+ .stringType()
+ .noDefaultValue();
+ if (!configuration.getOptional(subjectOption).isPresent()) {
+ configuration.setString(subjectOption, subject);
+ }
+ }
+
+ /**
+ * The partitioner can be either "fixed", "round-robin" or a customized
partitioner full class
+ * name.
+ */
+ public static Optional<FlinkKafkaPartitioner<RowData>>
getFlinkKafkaPartitioner(
+ ReadableConfig tableOptions, ClassLoader classLoader) {
+ return tableOptions
+ .getOptional(SINK_PARTITIONER)
+ .flatMap(
+ (String partitioner) -> {
+ switch (partitioner) {
+ case SINK_PARTITIONER_VALUE_FIXED:
+ return Optional.of(new
FlinkFixedPartitioner<>());
+ case SINK_PARTITIONER_VALUE_DEFAULT:
+ case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+ return Optional.empty();
+ // Default fallback to full class name of the
partitioner.
+ default:
+ return Optional.of(
+ initializePartitioner(partitioner,
classLoader));
+ }
+ });
+ }
+
+ /** Returns a class value with the given class name. */
+ private static <T> FlinkKafkaPartitioner<T> initializePartitioner(
+ String name, ClassLoader classLoader) {
+ try {
+ Class<?> clazz = Class.forName(name, true, classLoader);
+ if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
+ throw new ValidationException(
+ String.format(
+ "Sink partitioner class '%s' should extend
from the required class %s",
+ name, FlinkKafkaPartitioner.class.getName()));
+ }
+ @SuppressWarnings("unchecked")
+ final FlinkKafkaPartitioner<T> kafkaPartitioner =
+ InstantiationUtil.instantiate(name,
FlinkKafkaPartitioner.class, classLoader);
+
+ return kafkaPartitioner;
+ } catch (ClassNotFoundException | FlinkException e) {
+ throw new ValidationException(
+ String.format("Could not find and instantiate partitioner
class '%s'", name),
+ e);
+ }
+ }
+
+ public static Properties getKafkaProperties(Map<String, String>
tableOptions) {
+ final Properties kafkaProperties = new Properties();
+
+ if (hasKafkaClientProperties(tableOptions)) {
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+ .forEach(
+ key -> {
+ final String value = tableOptions.get(key);
+ final String subKey =
key.substring((PROPERTIES_PREFIX).length());
+ kafkaProperties.put(subKey, value);
+ });
+ }
+ return kafkaProperties;
+ }
+
+ /**
+ * Decides if the table options contains Kafka client properties that
start with prefix
+ * 'properties'.
+ */
+ private static boolean hasKafkaClientProperties(Map<String, String>
tableOptions) {
+ return tableOptions.keySet().stream().anyMatch(k ->
k.startsWith(PROPERTIES_PREFIX));
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Inner classes
+ //
--------------------------------------------------------------------------------------------
+
+ /** Kafka startup options. */
+ public static class StartupOptions {
+
+ public StartupMode startupMode;
+ public Map<KafkaTopicPartition, Long> specificOffsets;
+ public long startupTimestampMillis;
+ }
+
+ private KafkaConnectorOptionsUtil() {
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
new file mode 100644
index 0000000000..c87735ffc8
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.kafka.KafkaOptions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARTITIONER;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.StartupOptions;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getFlinkKafkaPartitioner;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getStartupOptions;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.validateTableSinkOptions;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.validateTableSourceOptions;
+
+/**
+ * Factory for creating configured instances of {@link KafkaDynamicSource} and
{@link
+ * KafkaDynamicSink}.
+ */
+@Internal
+public class KafkaDynamicTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaDynamicTableFactory.class);
+
+ private static final ConfigOption<String> SINK_SEMANTIC =
+ ConfigOptions.key("sink.semantic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Optional semantic when committing.");
+
+ public static final String IDENTIFIER = "kafka-inlong";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PROPS_BOOTSTRAP_SERVERS);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FORMAT);
+ options.add(KEY_FORMAT);
+ options.add(KEY_FIELDS);
+ options.add(KEY_FIELDS_PREFIX);
+ options.add(VALUE_FORMAT);
+ options.add(VALUE_FIELDS_INCLUDE);
+ options.add(TOPIC);
+ options.add(TOPIC_PATTERN);
+ options.add(PROPS_GROUP_ID);
+ options.add(SCAN_STARTUP_MODE);
+ options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
+ options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
+ options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+ options.add(SINK_PARTITIONER);
+ options.add(SINK_PARALLELISM);
+ options.add(DELIVERY_GUARANTEE);
+ options.add(TRANSACTIONAL_ID_PREFIX);
+ options.add(SINK_SEMANTIC);
+ options.add(Constants.INLONG_METRIC);
+ options.add(Constants.INLONG_AUDIT);
+ options.add(Constants.AUDIT_KEYS);
+ options.add(Constants.SINK_MULTIPLE_FORMAT);
+ options.add(Constants.PATTERN_PARTITION_MAP);
+ options.add(Constants.DATASOURCE_PARTITION_MAP);
+ options.add(Constants.SINK_SCHEMA_CHANGE_ENABLE);
+ options.add(Constants.SINK_SCHEMA_CHANGE_POLICIES);
+ options.add(KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG);
+ options.add(KafkaOptions.SINK_MULTIPLE_PARTITION_PATTERN);
+ options.add(KafkaOptions.SINK_FIXED_IDENTIFIER);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> forwardOptions() {
+ return Stream.of(
+ PROPS_BOOTSTRAP_SERVERS,
+ PROPS_GROUP_ID,
+ TOPIC,
+ TOPIC_PATTERN,
+ SCAN_STARTUP_MODE,
+ SCAN_STARTUP_SPECIFIC_OFFSETS,
+ SCAN_TOPIC_PARTITION_DISCOVERY,
+ SCAN_STARTUP_TIMESTAMP_MILLIS,
+ SINK_PARTITIONER,
+ SINK_PARALLELISM,
+ TRANSACTIONAL_ID_PREFIX)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+
+ final Optional<DecodingFormat<DeserializationSchema<RowData>>>
keyDecodingFormat =
+ getKeyDecodingFormat(helper);
+
+ final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat =
+ getValueDecodingFormat(helper);
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ final String valueFormatPrefix =
tableOptions.getOptional(FORMAT).orElse(tableOptions.get(VALUE_FORMAT));
+ LOG.info("valueFormatPrefix is {}", valueFormatPrefix);
+ helper.validateExcept(PROPERTIES_PREFIX, Constants.DIRTY_PREFIX,
valueFormatPrefix);
+
+ validateTableSourceOptions(tableOptions);
+
+ validatePKConstraints(
+ context.getObjectIdentifier(),
+ context.getPrimaryKeyIndexes(),
+ context.getCatalogTable().getOptions(),
+ valueDecodingFormat);
+
+ final StartupOptions startupOptions = getStartupOptions(tableOptions);
+
+ final Properties properties =
getKafkaProperties(context.getCatalogTable().getOptions());
+
+ // add topic-partition discovery
+ final Optional<Long> partitionDiscoveryInterval =
+
tableOptions.getOptional(SCAN_TOPIC_PARTITION_DISCOVERY).map(Duration::toMillis);
+ properties.setProperty(
+ KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
+ partitionDiscoveryInterval.orElse(-1L).toString());
+
+ final DataType physicalDataType = context.getPhysicalRowDataType();
+
+ final int[] keyProjection = createKeyFormatProjection(tableOptions,
physicalDataType);
+
+ final int[] valueProjection =
createValueFormatProjection(tableOptions, physicalDataType);
+
+ final String keyPrefix =
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+
+ return createKafkaTableSource(
+ physicalDataType,
+ keyDecodingFormat.orElse(null),
+ valueDecodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ getSourceTopics(tableOptions),
+ getSourceTopicPattern(tableOptions),
+ properties,
+ startupOptions.startupMode,
+ startupOptions.specificOffsets,
+ startupOptions.startupTimestampMillis,
+ context.getObjectIdentifier().asSummaryString());
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ final TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(
+ this, autoCompleteSchemaRegistrySubject(context));
+
+ final Optional<EncodingFormat<SerializationSchema<RowData>>>
keyEncodingFormat =
+ getKeyEncodingFormat(helper);
+
+ final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat
=
+ getValueEncodingFormat(helper);
+
+ helper.validateExcept(KafkaConnectorOptionsUtil.PROPERTIES_PREFIX);
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ final DeliveryGuarantee deliveryGuarantee =
validateDeprecatedSemantic(tableOptions);
+ validateTableSinkOptions(tableOptions);
+
+ KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions);
+
+ validatePKConstraints(
+ context.getObjectIdentifier(),
+ context.getPrimaryKeyIndexes(),
+ context.getCatalogTable().getOptions(),
+ valueEncodingFormat);
+
+ final DataType physicalDataType = context.getPhysicalRowDataType();
+
+ final int[] keyProjection = createKeyFormatProjection(tableOptions,
physicalDataType);
+
+ final int[] valueProjection =
createValueFormatProjection(tableOptions, physicalDataType);
+
+ final String keyPrefix =
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+
+ final Integer parallelism =
tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
+
+ return createKafkaTableSink(
+ physicalDataType,
+ keyEncodingFormat.orElse(null),
+ valueEncodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ tableOptions.get(TOPIC).get(0),
+ getKafkaProperties(context.getCatalogTable().getOptions()),
+ getFlinkKafkaPartitioner(tableOptions,
context.getClassLoader()).orElse(null),
+ deliveryGuarantee,
+ parallelism,
+ tableOptions.get(TRANSACTIONAL_ID_PREFIX));
+ }
+
+ private static Optional<DecodingFormat<DeserializationSchema<RowData>>>
getKeyDecodingFormat(
+ TableFactoryHelper helper) {
+ final Optional<DecodingFormat<DeserializationSchema<RowData>>>
keyDecodingFormat =
+ helper.discoverOptionalDecodingFormat(
+ DeserializationFormatFactory.class, KEY_FORMAT);
+ keyDecodingFormat.ifPresent(
+ format -> {
+ if
(!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ throw new ValidationException(
+ String.format(
+ "A key format should only deal with
INSERT-only records. "
+ + "But %s has a changelog mode
of %s.",
+ helper.getOptions().get(KEY_FORMAT),
+ format.getChangelogMode()));
+ }
+ });
+ return keyDecodingFormat;
+ }
+
+ private static Optional<EncodingFormat<SerializationSchema<RowData>>>
getKeyEncodingFormat(
+ TableFactoryHelper helper) {
+ final Optional<EncodingFormat<SerializationSchema<RowData>>>
keyEncodingFormat =
+
helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class,
KEY_FORMAT);
+ keyEncodingFormat.ifPresent(
+ format -> {
+ if
(!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ throw new ValidationException(
+ String.format(
+ "A key format should only deal with
INSERT-only records. "
+ + "But %s has a changelog mode
of %s.",
+ helper.getOptions().get(KEY_FORMAT),
+ format.getChangelogMode()));
+ }
+ });
+ return keyEncodingFormat;
+ }
+
+ private static DecodingFormat<DeserializationSchema<RowData>>
getValueDecodingFormat(
+ TableFactoryHelper helper) {
+ return helper.discoverOptionalDecodingFormat(
+ DeserializationFormatFactory.class, FORMAT)
+ .orElseGet(
+ () -> helper.discoverDecodingFormat(
+ DeserializationFormatFactory.class,
VALUE_FORMAT));
+ }
+
+ private static EncodingFormat<SerializationSchema<RowData>>
getValueEncodingFormat(
+ TableFactoryHelper helper) {
+ return helper.discoverOptionalEncodingFormat(
+ SerializationFormatFactory.class, FORMAT)
+ .orElseGet(
+ () -> helper.discoverEncodingFormat(
+ SerializationFormatFactory.class,
VALUE_FORMAT));
+ }
+
+ private static void validatePKConstraints(
+ ObjectIdentifier tableName,
+ int[] primaryKeyIndexes,
+ Map<String, String> options,
+ Format format) {
+ if (primaryKeyIndexes.length > 0
+ && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ Configuration configuration = Configuration.fromMap(options);
+ String formatName =
+ configuration
+ .getOptional(FORMAT)
+ .orElse(configuration.get(VALUE_FORMAT));
+ throw new ValidationException(
+ String.format(
+ "The Kafka table '%s' with '%s' format doesn't
support defining PRIMARY KEY constraint"
+ + " on the table, because it can't
guarantee the semantic of primary key.",
+ tableName.asSummaryString(), formatName));
+ }
+ }
+
+ private static DeliveryGuarantee validateDeprecatedSemantic(ReadableConfig
tableOptions) {
+ if (tableOptions.getOptional(SINK_SEMANTIC).isPresent()) {
+ LOG.warn(
+ "{} is deprecated and will be removed. Please use {}
instead.",
+ SINK_SEMANTIC.key(),
+ DELIVERY_GUARANTEE.key());
+ return DeliveryGuarantee.valueOf(
+ tableOptions.get(SINK_SEMANTIC).toUpperCase().replace("-",
"_"));
+ }
+ return tableOptions.get(DELIVERY_GUARANTEE);
+ }
+
+ protected KafkaDynamicSource createKafkaTableSource(
+ DataType physicalDataType,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>>
keyDecodingFormat,
+ DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+ int[] keyProjection,
+ int[] valueProjection,
+ @Nullable String keyPrefix,
+ @Nullable List<String> topics,
+ @Nullable Pattern topicPattern,
+ Properties properties,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets,
+ long startupTimestampMillis,
+ String tableIdentifier) {
+ return new KafkaDynamicSource(
+ physicalDataType,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topics,
+ topicPattern,
+ properties,
+ startupMode,
+ specificStartupOffsets,
+ startupTimestampMillis,
+ false,
+ tableIdentifier);
+ }
+
+ protected KafkaDynamicSink createKafkaTableSink(
+ DataType physicalDataType,
+ @Nullable EncodingFormat<SerializationSchema<RowData>>
keyEncodingFormat,
+ EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+ int[] keyProjection,
+ int[] valueProjection,
+ @Nullable String keyPrefix,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<RowData> partitioner,
+ DeliveryGuarantee deliveryGuarantee,
+ Integer parallelism,
+ @Nullable String transactionalIdPrefix) {
+ return new KafkaDynamicSink(
+ physicalDataType,
+ physicalDataType,
+ keyEncodingFormat,
+ valueEncodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topic,
+ properties,
+ partitioner,
+ deliveryGuarantee,
+ false,
+ SinkBufferFlushMode.DISABLED,
+ parallelism,
+ transactionalIdPrefix);
+ }
+
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
new file mode 100644
index 0000000000..2f61265c32
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.kafka.KafkaOptions;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
+import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
+
+/** Upsert-Kafka factory. */
+public class UpsertKafkaDynamicTableFactory implements
DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "upsert-kafka-inlong";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PROPS_BOOTSTRAP_SERVERS);
+ options.add(TOPIC);
+ options.add(KEY_FORMAT);
+ options.add(VALUE_FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(KEY_FIELDS_PREFIX);
+ options.add(VALUE_FIELDS_INCLUDE);
+ options.add(SINK_PARALLELISM);
+ options.add(SINK_BUFFER_FLUSH_INTERVAL);
+ options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+ options.add(Constants.INLONG_METRIC);
+ options.add(KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG);
+ options.add(KafkaOptions.SINK_MULTIPLE_PARTITION_PATTERN);
+ options.add(KafkaOptions.SINK_FIXED_IDENTIFIER);
+ options.add(KafkaConnectorOptions.SINK_PARTITIONER);
+ return options;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+
+ ReadableConfig tableOptions = helper.getOptions();
+ DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat =
+
helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
+ DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
+
helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT);
+
+ // Validate the option data type.
+ helper.validateExcept(PROPERTIES_PREFIX, Constants.DIRTY_PREFIX);
+ validateSource(
+ tableOptions,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ context.getPrimaryKeyIndexes());
+
+ Tuple2<int[], int[]> keyValueProjections =
+ createKeyValueProjections(context.getCatalogTable());
+ String keyPrefix =
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+ Properties properties =
getKafkaProperties(context.getCatalogTable().getOptions());
+ // always use earliest to keep data integrity
+ StartupMode earliest = StartupMode.EARLIEST;
+
+ return new KafkaDynamicSource(
+ context.getPhysicalRowDataType(),
+ keyDecodingFormat,
+ new DecodingFormatWrapper(valueDecodingFormat),
+ keyValueProjections.f0,
+ keyValueProjections.f1,
+ keyPrefix,
+ getSourceTopics(tableOptions),
+ getSourceTopicPattern(tableOptions),
+ properties,
+ earliest,
+ Collections.emptyMap(),
+ 0,
+ true,
+ context.getObjectIdentifier().asSummaryString());
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(
+ this, autoCompleteSchemaRegistrySubject(context));
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat =
+
helper.discoverEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT);
+ EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
+
helper.discoverEncodingFormat(SerializationFormatFactory.class, VALUE_FORMAT);
+
+ // Validate the option data type.
+ helper.validateExcept(PROPERTIES_PREFIX);
+ validateSink(
+ tableOptions,
+ keyEncodingFormat,
+ valueEncodingFormat,
+ context.getPrimaryKeyIndexes());
+
+ Tuple2<int[], int[]> keyValueProjections =
+ createKeyValueProjections(context.getCatalogTable());
+ final String keyPrefix =
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+ final Properties properties =
getKafkaProperties(context.getCatalogTable().getOptions());
+
+ Integer parallelism = tableOptions.get(SINK_PARALLELISM);
+
+ int batchSize = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS);
+ Duration batchInterval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL);
+ SinkBufferFlushMode flushMode =
+ new SinkBufferFlushMode(batchSize, batchInterval.toMillis());
+
+ // use {@link
org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+ // it will use hash partition if key is set else in round-robin
behaviour.
+ return new KafkaDynamicSink(
+ context.getPhysicalRowDataType(),
+ context.getPhysicalRowDataType(),
+ keyEncodingFormat,
+ new EncodingFormatWrapper(valueEncodingFormat),
+ keyValueProjections.f0,
+ keyValueProjections.f1,
+ keyPrefix,
+ tableOptions.get(TOPIC).get(0),
+ properties,
+ null,
+ DeliveryGuarantee.AT_LEAST_ONCE,
+ true,
+ flushMode,
+ parallelism,
+ tableOptions.get(TRANSACTIONAL_ID_PREFIX));
+ }
+
+ private Tuple2<int[], int[]>
createKeyValueProjections(ResolvedCatalogTable catalogTable) {
+ ResolvedSchema schema = catalogTable.getResolvedSchema();
+ // primary key should validated earlier
+ List<String> keyFields = schema.getPrimaryKey().get().getColumns();
+ DataType physicalDataType = schema.toPhysicalRowDataType();
+
+ Configuration tableOptions =
Configuration.fromMap(catalogTable.getOptions());
+ // upsert-kafka will set key.fields to primary key fields by default
+ tableOptions.set(KEY_FIELDS, keyFields);
+
+ int[] keyProjection = createKeyFormatProjection(tableOptions,
physicalDataType);
+ int[] valueProjection = createValueFormatProjection(tableOptions,
physicalDataType);
+
+ return Tuple2.of(keyProjection, valueProjection);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Validation
+ //
--------------------------------------------------------------------------------------------
+
+ private static void validateSource(
+ ReadableConfig tableOptions,
+ Format keyFormat,
+ Format valueFormat,
+ int[] primaryKeyIndexes) {
+ validateTopic(tableOptions);
+ validateFormat(keyFormat, valueFormat, tableOptions);
+ validatePKConstraints(primaryKeyIndexes);
+ }
+
+ private static void validateSink(
+ ReadableConfig tableOptions,
+ Format keyFormat,
+ Format valueFormat,
+ int[] primaryKeyIndexes) {
+ validateTopic(tableOptions);
+ validateFormat(keyFormat, valueFormat, tableOptions);
+ validatePKConstraints(primaryKeyIndexes);
+ validateSinkBufferFlush(tableOptions);
+ }
+
+ private static void validateTopic(ReadableConfig tableOptions) {
+ List<String> topic = tableOptions.get(TOPIC);
+ if (topic.size() > 1) {
+ throw new ValidationException(
+ "The 'upsert-kafka' connector doesn't support topic list
now. "
+ + "Please use single topic as the value of the
parameter 'topic'.");
+ }
+ }
+
+ private static void validateFormat(
+ Format keyFormat, Format valueFormat, ReadableConfig tableOptions)
{
+ if (!keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ String identifier = tableOptions.get(KEY_FORMAT);
+ throw new ValidationException(
+ String.format(
+ "'upsert-kafka' connector doesn't support '%s' as
key format, "
+ + "because '%s' is not in insert-only
mode.",
+ identifier, identifier));
+ }
+ if (!valueFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ String identifier = tableOptions.get(VALUE_FORMAT);
+ throw new ValidationException(
+ String.format(
+ "'upsert-kafka' connector doesn't support '%s' as
value format, "
+ + "because '%s' is not in insert-only
mode.",
+ identifier, identifier));
+ }
+ }
+
+ private static void validatePKConstraints(int[] schema) {
+ if (schema.length == 0) {
+ throw new ValidationException(
+ "'upsert-kafka' tables require to define a PRIMARY KEY
constraint. "
+ + "The PRIMARY KEY specifies which columns should
be read from or write to the Kafka message key. "
+ + "The PRIMARY KEY also defines records in the
'upsert-kafka' table should update or delete on which keys.");
+ }
+ }
+
+ private static void validateSinkBufferFlush(ReadableConfig tableOptions) {
+ int flushMaxRows = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS);
+ long flushIntervalMs =
tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis();
+ if (flushMaxRows > 0 && flushIntervalMs > 0) {
+ // flush is enabled
+ return;
+ }
+ if (flushMaxRows <= 0 && flushIntervalMs <= 0) {
+ // flush is disabled
+ return;
+ }
+ // one of them is set which is not allowed
+ throw new ValidationException(
+ String.format(
+ "'%s' and '%s' must be set to be greater than zero
together to enable sink buffer flushing.",
+ SINK_BUFFER_FLUSH_MAX_ROWS.key(),
SINK_BUFFER_FLUSH_INTERVAL.key()));
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Format wrapper
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * It is used to wrap the decoding format and expose the desired changelog
mode. It's only works
+ * for insert-only format.
+ */
+ protected static class DecodingFormatWrapper
+ implements
+ DecodingFormat<DeserializationSchema<RowData>> {
+
+ private final DecodingFormat<DeserializationSchema<RowData>>
innerDecodingFormat;
+
+ private static final ChangelogMode SOURCE_CHANGELOG_MODE =
+ ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+
+ public DecodingFormatWrapper(
+ DecodingFormat<DeserializationSchema<RowData>>
innerDecodingFormat) {
+ this.innerDecodingFormat = innerDecodingFormat;
+ }
+
+ @Override
+ public DeserializationSchema<RowData> createRuntimeDecoder(
+ DynamicTableSource.Context context, DataType producedDataType)
{
+ return innerDecodingFormat.createRuntimeDecoder(context,
producedDataType);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return SOURCE_CHANGELOG_MODE;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ DecodingFormatWrapper that = (DecodingFormatWrapper) obj;
+ return Objects.equals(innerDecodingFormat,
that.innerDecodingFormat);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(innerDecodingFormat);
+ }
+ }
+
+ /**
+ * It is used to wrap the encoding format and expose the desired changelog
mode. It's only works
+ * for insert-only format.
+ */
+ protected static class EncodingFormatWrapper
+ implements
+ EncodingFormat<SerializationSchema<RowData>> {
+
+ private final EncodingFormat<SerializationSchema<RowData>>
innerEncodingFormat;
+
+ public static final ChangelogMode SINK_CHANGELOG_MODE =
+ ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+
+ public EncodingFormatWrapper(
+ EncodingFormat<SerializationSchema<RowData>>
innerEncodingFormat) {
+ this.innerEncodingFormat = innerEncodingFormat;
+ }
+
+ @Override
+ public SerializationSchema<RowData> createRuntimeEncoder(
+ DynamicTableSink.Context context, DataType consumedDataType) {
+ return innerEncodingFormat.createRuntimeEncoder(context,
consumedDataType);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return SINK_CHANGELOG_MODE;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ EncodingFormatWrapper that = (EncodingFormatWrapper) obj;
+ return Objects.equals(innerEncodingFormat,
that.innerEncodingFormat);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(innerEncodingFormat);
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..7eeb7cd7a8
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.kafka.table.KafkaDynamicTableFactory
+org.apache.inlong.sort.kafka.table.UpsertKafkaDynamicTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
index 38c475a573..dd789c6bd5 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
@@ -42,6 +42,7 @@
<module>tubemq</module>
<module>hbase</module>
<module>hudi</module>
+ <module>kafka</module>
</modules>
<properties>
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index 3ac170a2d3..ec46cd86db 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -815,6 +815,12 @@
Source : flink-connector-hbase-2.2 1.15.4 (Please note that the software
have been modified.)
License : https://github.com/apache/flink/blob/master/LICENSE
+1.3.20
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
+ Source : org.apache.flink:flink-connector-kafka:1.15.4 (Please note that
the software have been modified.)
+ License : https://github.com/apache/flink/blob/master/LICENSE
+
=======================================================================
Apache InLong Subcomponents:
diff --git a/pom.xml b/pom.xml
index 65086684c4..3d03a436ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1192,6 +1192,12 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>jdbc</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
@@ -1200,7 +1206,19 @@
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
- <artifactId>jdbc</artifactId>
+ <artifactId>postgresql</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mongodb</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>clickhouse</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>