This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bd74989099 [Feature][Connector-V2] connector-kafka source support data
conversion extracted by kafka connect source (#4516)
bd74989099 is described below
commit bd74989099ecf44a16fa2fc5c18940b8f28bf2d2
Author: Xiaojian Sun <[email protected]>
AuthorDate: Fri Aug 11 21:25:24 2023 +0800
[Feature][Connector-V2] connector-kafka source support data conversion
extracted by kafka connect source (#4516)
* Compatible kafka connect json #4137
---
.../formats/kafka-compatible-kafkaconnect-json.md | 47 ++++
docs/en/connector-v2/source/MyHours.md | 8 +-
seatunnel-connectors-v2/connector-kafka/pom.xml | 12 +
.../seatunnel/kafka/config/MessageFormat.java | 3 +-
.../seatunnel/kafka/source/KafkaSource.java | 6 +
.../seatunnel/kafka/source/KafkaSourceReader.java | 16 +-
.../connector-kafka-e2e/pom.xml | 5 +
.../e2e/connector/kafka/KafkaConnectToKafkaIT.java | 282 +++++++++++++++++++++
.../kafkasource_jdbc_record_to_mysql.conf | 63 +++++
.../seatunnel/engine/e2e/JobExecutionIT.java | 12 +-
seatunnel-formats/pom.xml | 1 +
.../pom.xml | 42 ++-
...ompatibleKafkaConnectDeserializationSchema.java | 213 ++++++++++++++++
.../json/KafkaConnectJsonFormatOptions.java | 49 ++++
14 files changed, 718 insertions(+), 41 deletions(-)
diff --git a/docs/en/connector-v2/formats/kafka-compatible-kafkaconnect-json.md
b/docs/en/connector-v2/formats/kafka-compatible-kafkaconnect-json.md
new file mode 100644
index 0000000000..7de8a9e838
--- /dev/null
+++ b/docs/en/connector-v2/formats/kafka-compatible-kafkaconnect-json.md
@@ -0,0 +1,47 @@
+# Kafka source compatible kafka-connect-json
+
+Seatunnel connector kafka supports parsing data extracted through kafka
connect source, especially data extracted from kafka connect jdbc and kafka
connect debezium
+
+# How to use
+
+## Kafka output to mysql
+
+```bash
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "localhost:9092"
+ topic = "jdbc_source_record"
+ result_table_name = "kafka_table"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = COMPATIBLE_KAFKA_CONNECT_JSON
+ }
+}
+
+
+sink {
+ Jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://localhost:3306/seatunnel"
+ user = st_user
+ password = seatunnel
+ generate_sink_sql = true
+ database = seatunnel
+ table = jdbc_sink
+ primary_keys = ["id"]
+ }
+}
+```
+
diff --git a/docs/en/connector-v2/source/MyHours.md
b/docs/en/connector-v2/source/MyHours.md
index 87d9ab3ce3..f90d42ab1c 100644
--- a/docs/en/connector-v2/source/MyHours.md
+++ b/docs/en/connector-v2/source/MyHours.md
@@ -35,13 +35,13 @@ Used to read data from My Hours.
In order to use the My Hours connector, the following dependencies are
required.
They can be downloaded via install-plugin.sh or from the Maven central
repository.
-| Datasource | Supported Versions | Dependency
|
-|------------|--------------------|------------------------------------------------------------------------------------------------|
-| My Hours | universal |
[Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2)
|
+| Datasource | Supported Versions |
Dependency |
+|------------|--------------------|---------------------------------------------------------------------------------------------|
+| My Hours | universal |
[Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2)
|
## Source Options
-| Name | Type | Required | Default | Description
|
+| Name | Type | Required | Default |
Description
|
|-----------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------|
| url | String | Yes | - | Http request
url.
|
| email | String | Yes | - | My hours login
email address.
|
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml
b/seatunnel-connectors-v2/connector-kafka/pom.xml
index 0ce4bba6b1..7955ab3f54 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-connectors-v2/connector-kafka/pom.xml
@@ -31,6 +31,7 @@
<properties>
<kafka.client.version>3.2.0</kafka.client.version>
+ <debezium.version>1.6.4.Final</debezium.version>
</properties>
<dependencies>
@@ -61,6 +62,17 @@
<artifactId>seatunnel-format-compatible-debezium-json</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-compatible-connect-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-json</artifactId>
+ <version>${kafka.client.version}</version>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
index 1ef29f6322..07f9a38ddf 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
@@ -22,5 +22,6 @@ public enum MessageFormat {
TEXT,
CANAL_JSON,
DEBEZIUM_JSON,
- COMPATIBLE_DEBEZIUM_JSON
+ COMPATIBLE_DEBEZIUM_JSON,
+ COMPATIBLE_KAFKA_CONNECT_JSON
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 30878e82a2..802d7986a9 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -45,6 +45,7 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatError
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
+import
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
@@ -268,6 +269,11 @@ public class KafkaSource
.setIgnoreParseErrors(true)
.build();
break;
+ case COMPATIBLE_KAFKA_CONNECT_JSON:
+ deserializationSchema =
+ new CompatibleKafkaConnectDeserializationSchema(
+ typeInfo, config, false, false);
+ break;
case DEBEZIUM_JSON:
boolean includeSchema =
DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue();
if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 226fded240..a2d3bae2b4 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
+import
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -150,9 +151,18 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
recordList) {
try {
-
deserializationSchema.deserialize(
-
record.value(), output);
- } catch (Exception e) {
+ if
(deserializationSchema
+ instanceof
+
CompatibleKafkaConnectDeserializationSchema) {
+
((CompatibleKafkaConnectDeserializationSchema)
+
deserializationSchema)
+
.deserialize(
+
record, output);
+ } else {
+
deserializationSchema.deserialize(
+
record.value(), output);
+ }
+ } catch (IOException
e) {
if
(this.messageFormatErrorHandleWay
==
MessageFormatErrorHandleWay
.SKIP) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
index 81cbb78569..fa2e1930cc 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
@@ -92,6 +92,11 @@
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaConnectToKafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaConnectToKafkaIT.java
new file mode 100644
index 0000000000..591049917f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaConnectToKafkaIT.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK})
+public class KafkaConnectToKafkaIT extends TestSuiteBase implements
TestResource {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaConnectToKafkaIT.class);
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ // kafka
+ private static final String KAFKA_IMAGE_NAME =
"confluentinc/cp-kafka:latest";
+
+ private static final String KAFKA_JDBC_TOPIC = "jdbc_source_record";
+
+ private static final String KAFKA_HOST = "kafka_connect_source_record";
+
+ private static KafkaContainer KAFKA_CONTAINER;
+
+ private KafkaProducer<byte[], byte[]> kafkaProducer;
+
+ //
-----------------------------------mysql-----------------------------------------
+ private static MySqlContainer MYSQL_CONTAINER;
+ private static final String MYSQL_DATABASE = "seatunnel";
+ private static final String MYSQL_HOST = "kafka_to_mysql_e2e";
+ private static final int MYSQL_PORT = 3306;
+ private static final String MYSQL_DRIVER_JAR =
+
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib &&
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ + MYSQL_DRIVER_JAR);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ };
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ MySqlContainer mySqlContainer =
+ new MySqlContainer(version)
+
.withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName("seatunnel")
+ .withUsername("st_user")
+ .withPassword("seatunnel")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ mySqlContainer.setPortBindings(
+ com.google.common.collect.Lists.newArrayList(
+ String.format("%s:%s", MYSQL_PORT, MYSQL_PORT)));
+ return mySqlContainer;
+ }
+
+ private void createKafkaContainer() {
+ KAFKA_CONTAINER =
+ new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() {
+
+ LOG.info("The first stage: Starting Kafka containers...");
+ createKafkaContainer();
+ Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+ LOG.info("Kafka Containers are started");
+
+ given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(this::initKafkaProducer);
+
+ LOG.info("The second stage: Starting Mysql containers...");
+ MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Mysql Containers are started");
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(this::initializeDatabase);
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(this::initializeJdbcTable);
+
+ log.info("Write 3 records to topic " + KAFKA_JDBC_TOPIC);
+ generateConnectJdbcRecord();
+ }
+
+ @TestTemplate
+ public void testJdbcRecordKafkaToMysql(TestContainer container)
+ throws IOException, InterruptedException, SQLException {
+ Container.ExecResult execResult =
+ container.executeJob("/kafkasource_jdbc_record_to_mysql.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ List<Object> actual = new ArrayList<>();
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword())) {
+ try (Statement statement = connection.createStatement()) {
+ ResultSet resultSet =
+ statement.executeQuery("select * from
seatunnel.jdbc_sink order by id");
+ while (resultSet.next()) {
+ List<Object> row =
+ Arrays.asList(
+ resultSet.getInt("id"),
+ resultSet.getString("name"),
+ resultSet.getString("description"),
+ resultSet.getString("weight"));
+ actual.add(row);
+ }
+ }
+ }
+ List<Object> expected =
+ Lists.newArrayList(
+ Arrays.asList(15, "test", "test", "20"),
+ Arrays.asList(16, "test-001", "test", "30"),
+ Arrays.asList(18, "sdc", "sdc", "sdc"));
+ Assertions.assertIterableEquals(expected, actual);
+
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword())) {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute("truncate table seatunnel.jdbc_sink");
+ LOG.info("testJdbcRecordKafkaToMysql truncate table sink");
+ }
+ }
+ }
+
+ @SneakyThrows
+ public void generateConnectJdbcRecord() {
+ String[] jdbcSourceRecords = {
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"string\",\"optional\":true,\"field\":\"weight\"}],\"optional\":false,\"name\":\"test_database_001.seatunnel_test_cdc\"},\"payload\":{\"id\":15,\"name\":\"test\",\"description\":\"test\",\"weight\":\"20\"}}",
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"string\",\"optional\":true,\"field\":\"weight\"}],\"optional\":false,\"name\":\"test_database_001.seatunnel_test_cdc\"},\"payload\":{\"id\":16,\"name\":\"test-001\",\"description\":\"test\",\"weight\":\"30\"}}",
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int64\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"string\",\"optional\":true,\"field\":\"weight\"}],\"optional\":false,\"name\":\"test_database_001.seatunnel_test_cdc\"},\"payload\":{\"id\":18,\"name\":\"sdc\",\"description\":\"sdc\",\"weight\":\"sdc\"}}"
+ };
+ for (String value : jdbcSourceRecords) {
+ JsonNode jsonNode = objectMapper.readTree(value);
+ byte[] bytes = objectMapper.writeValueAsBytes(jsonNode);
+ ProducerRecord<byte[], byte[]> producerRecord =
+ new ProducerRecord<>(KAFKA_JDBC_TOPIC, null, bytes);
+ kafkaProducer.send(producerRecord).get();
+ }
+ }
+
+ private void initKafkaProducer() {
+ Properties props = new Properties();
+ String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
+ kafkaProducer = new KafkaProducer<>(props);
+ }
+
+ @Override
+ public void tearDown() {
+ MYSQL_CONTAINER.close();
+ KAFKA_CONTAINER.close();
+ }
+
+ protected void initializeDatabase() {
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE DATABASE IF NOT EXISTS " + MYSQL_DATABASE;
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing Mysql database failed!",
e);
+ }
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword())) {
+ Statement statement = connection.createStatement();
+ String jdbcSink =
+ "CREATE TABLE IF NOT EXISTS seatunnel.jdbc_sink(\n"
+ + "id INT NOT NULL PRIMARY KEY,\n"
+ + "name varchar(255),\n"
+ + "description varchar(255),\n"
+ + "weight varchar(255)"
+ + ")";
+ statement.execute(jdbcSink);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing Mysql table failed!", e);
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_jdbc_record_to_mysql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_jdbc_record_to_mysql.conf
new file mode 100644
index 0000000000..36ae276e03
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_jdbc_record_to_mysql.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafka_connect_source_record:9092"
+ topic = "jdbc_source_record"
+ result_table_name = "kafka_table"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = COMPATIBLE_KAFKA_CONNECT_JSON
+ }
+}
+
+
+sink {
+ Jdbc {
+ driver = com.mysql.cj.jdbc.Driver
+ url = "jdbc:mysql://kafka_to_mysql_e2e:3306/seatunnel"
+ user = st_user
+ password = seatunnel
+ generate_sink_sql = true
+ database = seatunnel
+ table = jdbc_sink
+ primary_keys = ["id"]
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 501e763e3c..6e5ebcf587 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -26,7 +26,6 @@ import
org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -146,11 +145,6 @@ public class JobExecutionIT {
JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- JobStatus jobStatus = clientJobProxy.getJobStatus();
- while (jobStatus == JobStatus.RUNNING) {
- Thread.sleep(1000L);
- jobStatus = clientJobProxy.getJobStatus();
- }
CompletableFuture<JobResult> completableFuture =
CompletableFuture.supplyAsync(
@@ -165,16 +159,14 @@ public class JobExecutionIT {
new RetryUtils.RetryMaterial(
100000,
true,
- exception ->
-
ExceptionUtil.isOperationNeedRetryException(
- exception),
+ exception -> true,
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
- await().atMost(600000, TimeUnit.MILLISECONDS)
+ await().atMost(6000000, TimeUnit.MILLISECONDS)
.untilAsserted(() ->
Assertions.assertTrue(completableFuture.isDone()));
JobResult result = completableFuture.get();
diff --git a/seatunnel-formats/pom.xml b/seatunnel-formats/pom.xml
index 983a8629ce..7fc09b356a 100644
--- a/seatunnel-formats/pom.xml
+++ b/seatunnel-formats/pom.xml
@@ -30,6 +30,7 @@
<module>seatunnel-format-json</module>
<module>seatunnel-format-text</module>
<module>seatunnel-format-compatible-debezium-json</module>
+ <module>seatunnel-format-compatible-connect-json</module>
</modules>
</project>
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml
b/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml
similarity index 70%
copy from seatunnel-connectors-v2/connector-kafka/pom.xml
copy to seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml
index 0ce4bba6b1..d3d5545742 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-formats/seatunnel-format-compatible-connect-json/pom.xml
@@ -1,66 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
-
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
-
http://www.apache.org/licenses/LICENSE-2.0
-
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-connectors-v2</artifactId>
+ <artifactId>seatunnel-formats</artifactId>
<version>${revision}</version>
</parent>
- <artifactId>connector-kafka</artifactId>
- <name>SeaTunnel : Connectors V2 : Kafka</name>
-
+ <artifactId>seatunnel-format-compatible-connect-json</artifactId>
+ <name>SeaTunnel : Formats : Compatible Kafka Connect Json</name>
<properties>
- <kafka.client.version>3.2.0</kafka.client.version>
+ <debezium.version>1.6.4.Final</debezium.version>
</properties>
<dependencies>
-
- <!-- TODO add to dependency management after version unify-->
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-common</artifactId>
+ <artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${kafka.client.version}</version>
- </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-json</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
+
<dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-format-text</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>3.2.0</version>
+ <scope>provided</scope>
</dependency>
+
<dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-format-compatible-debezium-json</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-json</artifactId>
+ <version>3.2.0</version>
+ <scope>provided</scope>
</dependency>
+
</dependencies>
</project>
diff --git
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
new file mode 100644
index 0000000000..b2e6ac97e9
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.compatible.kafka.connect.json;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+import org.apache.seatunnel.format.json.JsonToRowConverters;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/** Compatible kafka connect deserialization schema */
+@RequiredArgsConstructor
+public class CompatibleKafkaConnectDeserializationSchema
+ implements DeserializationSchema<SeaTunnelRow> {
+
+ private static final String INCLUDE_SCHEMA_METHOD =
"convertToJsonWithEnvelope";
+ private static final String EXCLUDE_SCHEMA_METHOD =
"convertToJsonWithoutEnvelope";
+ private static final String KAFKA_CONNECT_SINK_RECORD_PAYLOAD = "payload";
+ private transient JsonConverter keyConverter;
+ private transient JsonConverter valueConverter;
+ private transient Method keyConverterMethod;
+ private transient Method valueConverterMethod;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final JsonToRowConverters.JsonToRowConverter runtimeConverter;
+ private final boolean keySchemaEnable;
+ private final boolean valueSchemaEnable;
+ /** Object mapper for parsing the JSON. */
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ public CompatibleKafkaConnectDeserializationSchema(
+ @NonNull SeaTunnelRowType seaTunnelRowType,
+ @NonNull Config config,
+ boolean failOnMissingField,
+ boolean ignoreParseErrors) {
+
+ Map<String, String> configMap =
ReadonlyConfig.fromConfig(config).toMap();
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.keySchemaEnable =
+
KafkaConnectJsonFormatOptions.getKeyConverterSchemaEnabled(configMap);
+ this.valueSchemaEnable =
+
KafkaConnectJsonFormatOptions.getValueConverterSchemaEnabled(configMap);
+
+ // Runtime converter
+ this.runtimeConverter =
+ new JsonToRowConverters(failOnMissingField, ignoreParseErrors)
+ .createConverter(checkNotNull(seaTunnelRowType));
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ throw new UnsupportedEncodingException();
+ }
+
+ /**
+ * Deserialize kafka consumer record
+ *
+ * @param msg
+ * @param out
+ * @throws Exception
+ */
+ public void deserialize(ConsumerRecord<byte[], byte[]> msg,
Collector<SeaTunnelRow> out)
+ throws InvocationTargetException, IllegalAccessException {
+ tryInitConverter();
+ SinkRecord record = convertToSinkRecord(msg);
+ RowKind rowKind = RowKind.INSERT;
+ JsonNode jsonNode =
+ (JsonNode)
+ valueConverterMethod.invoke(
+ valueConverter, record.valueSchema(),
record.value());
+ JsonNode payload = jsonNode.get(KAFKA_CONNECT_SINK_RECORD_PAYLOAD);
+ if (payload.isArray()) {
+ ArrayNode arrayNode = (ArrayNode) payload;
+ for (int i = 0; i < arrayNode.size(); i++) {
+ SeaTunnelRow row = convertJsonNode(arrayNode.get(i));
+ row.setRowKind(rowKind);
+ out.collect(row);
+ }
+ } else {
+ SeaTunnelRow row = convertJsonNode(payload);
+ row.setRowKind(rowKind);
+ out.collect(row);
+ }
+ }
+
+ private SeaTunnelRow convertJsonNode(JsonNode jsonNode) {
+ if (jsonNode.isNull()) {
+ return null;
+ }
+ try {
+ org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode
jsonData =
+ objectMapper.readTree(jsonNode.toString());
+ return (SeaTunnelRow) runtimeConverter.convert(jsonData);
+ } catch (Throwable t) {
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.JSON_OPERATION_FAILED,
+ String.format("Failed to deserialize JSON '%s'.",
jsonNode),
+ t);
+ }
+ }
+
+ private SinkRecord convertToSinkRecord(ConsumerRecord<byte[], byte[]> msg)
{
+ SchemaAndValue keyAndSchema =
+ (msg.key() == null)
+ ? SchemaAndValue.NULL
+ : keyConverter.toConnectData(msg.topic(),
msg.headers(), msg.key());
+ SchemaAndValue valueAndSchema =
+ valueConverter.toConnectData(msg.topic(), msg.headers(),
msg.value());
+ return new SinkRecord(
+ msg.topic(),
+ msg.partition(),
+ keyAndSchema.schema(),
+ keyAndSchema.value(),
+ valueAndSchema.schema(),
+ valueAndSchema.value(),
+ msg.offset(),
+ msg.timestamp(),
+ msg.timestampType(),
+ null);
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return seaTunnelRowType;
+ }
+
+ private void tryInitConverter() {
+ if (keyConverter == null) {
+ synchronized (this) {
+ if (keyConverter == null) {
+ keyConverter = new JsonConverter();
+ keyConverter.configure(
+ Collections.singletonMap(
+ JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
keySchemaEnable),
+ true);
+ keyConverterMethod =
+ ReflectionUtils.getDeclaredMethod(
+ JsonConverter.class,
+ keySchemaEnable
+ ? INCLUDE_SCHEMA_METHOD
+ : EXCLUDE_SCHEMA_METHOD,
+ Schema.class,
+ Object.class)
+ .get();
+ }
+ }
+ }
+ if (valueConverter == null) {
+ synchronized (this) {
+ if (valueConverter == null) {
+ valueConverter = new JsonConverter();
+ valueConverter.configure(
+ Collections.singletonMap(
+ JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
valueSchemaEnable),
+ false);
+ valueConverterMethod =
+ ReflectionUtils.getDeclaredMethod(
+ JsonConverter.class,
+ valueSchemaEnable
+ ? INCLUDE_SCHEMA_METHOD
+ : EXCLUDE_SCHEMA_METHOD,
+ Schema.class,
+ Object.class)
+ .get();
+ }
+ }
+ }
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/KafkaConnectJsonFormatOptions.java
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/KafkaConnectJsonFormatOptions.java
new file mode 100644
index 0000000000..05e16e0abb
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/KafkaConnectJsonFormatOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.compatible.kafka.connect.json;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.Map;
+
+public class KafkaConnectJsonFormatOptions {
+
+ public static final Option<Boolean> KEY_CONVERTER_SCHEMA_ENABLED =
+ Options.key("key_converter_schema_enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("kafka connect key converter schema
enabled.");
+
+ public static final Option<Boolean> VALUE_CONVERTER_SCHEMA_ENABLED =
+ Options.key("value_converter_schema_enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("kafka connect value converter schema
enabled.");
+
+ public static boolean getKeyConverterSchemaEnabled(Map<String, String>
options) {
+ return Boolean.parseBoolean(
+ options.getOrDefault(KEY_CONVERTER_SCHEMA_ENABLED.key(),
"true"));
+ }
+
+ public static boolean getValueConverterSchemaEnabled(Map<String, String>
options) {
+ return Boolean.parseBoolean(
+ options.getOrDefault(VALUE_CONVERTER_SCHEMA_ENABLED.key(),
"true"));
+ }
+}