hailin0 commented on code in PR #3950:
URL: 
https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1093942584


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+public class CannalToKafakIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CannalToKafakIT.class);
+
+    private static GenericContainer<?> CANAL_CONTAINER;
+
+    private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6";
+
+    private static final String CANAL_HOST = "canal_e2e";
+
+    private static final int CANAL_PORT = 11111;
+
+    
//----------------------------------------------------------------------------
+    // kafka
+    private static final String KAFKA_IMAGE_NAME = 
"confluentinc/cp-kafka:latest";
+
+    private static final String KAFKA_TOPIC = "test-canal-sink";
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    
//----------------------------------------------------------------------------
+    // mysql
+    private static final String MYSQL_HOST = "mysql_e2e";
+
+    private static final int MYSQL_PORT = 3306;
+
+    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+        new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer = new MySqlContainer(version)
+            .withConfigurationOverride("docker/server-gtids/my.cnf")
+            .withSetupSQL("docker/setup.sql")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MYSQL_HOST)
+            .withDatabaseName("canal")
+            .withUsername("st_user")
+            .withPassword("seatunnel")
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+        
mySqlContainer.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", MYSQL_PORT, MYSQL_PORT)));
+        return mySqlContainer;
+    }
+
+    private void createCanalContainer() {
+        CANAL_CONTAINER = new GenericContainer<>(CANAL_DOCKER_IMAGE)
+            
.withCopyFileToContainer(MountableFile.forClasspathResource("canal/canal.properties"),
 "/app/server/conf/canal.properties")
+            
.withCopyFileToContainer(MountableFile.forClasspathResource("canal/instance.properties"),
 "/app/server/conf/example/instance.properties")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CANAL_HOST)
+            .withCommand()
+            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+        
CANAL_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", CANAL_PORT, CANAL_PORT)));
+    }
+
+    private void createKafkaContainer(){
+        KAFKA_CONTAINER = new 
KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(KAFKA_HOST)
+            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+        
KAFKA_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+
+        LOG.info("The third stage: Starting Kafka containers...");
+        createKafkaContainer();
+        Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Canal containers...");
+        createCanalContainer();
+        Startables.deepStart(Stream.of(CANAL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        inventoryDatabase.createAndInitialize();
+
+        Awaitility.given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initKafkaConsumer);
+    }
+
+    @TestTemplate
+    public void testCannalToKafakCannalFormatAnalysis(TestContainer container) 
throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/kafkasource_canal_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        ArrayList<Object> result = new ArrayList<>();
+        List<String> expectedResult =
+                Arrays.asList(
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":\"1.0\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}"

Review Comment:
   mv to file



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java:
##########
@@ -51,22 +49,12 @@ public DeserializationFormat 
createDeserializationFormat(TableFactoryContext con
         boolean ignoreParseErrors = 
JsonFormatOptions.getIgnoreParseErrors(options);
 
         // TODO config SeaTunnelRowType
-        return new DeserializationFormat() {
-            @Override
-            public DeserializationSchema createDeserializationSchema() {
-                return new JsonDeserializationSchema(failOnMissingField, 
ignoreParseErrors, null);
-            }
-        };
+        return () -> new JsonDeserializationSchema(failOnMissingField, 
ignoreParseErrors, null);

Review Comment:
   revert



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.connector.DeserializationFormat;
+import org.apache.seatunnel.api.table.connector.SerializationFormat;
+import org.apache.seatunnel.api.table.factory.DeserializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.SerializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
+
+import java.util.Map;
+
+/**
+ * Format factory for providing configured instances of Canal JSON to RowData 
{@link
+ * DeserializationSchema}.
+ */
+
+public class CanalJsonFormatFactory
+        implements DeserializationFormatFactory, SerializationFormatFactory {
+
+    public static final String IDENTIFIER = "canal-json";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        // TODO config option rules
+        return OptionRule.builder().build();
+    }
+
+    @Override
+    public SerializationFormat createSerializationFormat(TableFactoryContext 
context) {
+        return () -> new CanalJsonSerializationSchema(null);

Review Comment:
   lambda cannot serialize and deserialize



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java:
##########
@@ -51,22 +49,12 @@ public DeserializationFormat 
createDeserializationFormat(TableFactoryContext con
         boolean ignoreParseErrors = 
JsonFormatOptions.getIgnoreParseErrors(options);
 
         // TODO config SeaTunnelRowType
-        return new DeserializationFormat() {
-            @Override
-            public DeserializationSchema createDeserializationSchema() {
-                return new JsonDeserializationSchema(failOnMissingField, 
ignoreParseErrors, null);
-            }
-        };
+        return () -> new JsonDeserializationSchema(failOnMissingField, 
ignoreParseErrors, null);
     }
 
     @Override
     public SerializationFormat createSerializationFormat(TableFactoryContext 
context) {
         // TODO config SeaTunnelRowType
-        return new SerializationFormat() {
-            @Override
-            public SerializationSchema createSerializationSchema() {
-                return new JsonSerializationSchema(null);
-            }
-        };
+        return () -> new JsonSerializationSchema(null);

Review Comment:
   revert



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.canal;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.connector.DeserializationFormat;
+import org.apache.seatunnel.api.table.connector.SerializationFormat;
+import org.apache.seatunnel.api.table.factory.DeserializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.SerializationFormatFactory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
+
+import java.util.Map;
+
+/**
+ * Format factory for providing configured instances of Canal JSON to RowData 
{@link
+ * DeserializationSchema}.
+ */
+
+public class CanalJsonFormatFactory
+        implements DeserializationFormatFactory, SerializationFormatFactory {
+
+    public static final String IDENTIFIER = "canal-json";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        // TODO config option rules
+        return OptionRule.builder().build();
+    }
+
+    @Override
+    public SerializationFormat createSerializationFormat(TableFactoryContext 
context) {
+        return () -> new CanalJsonSerializationSchema(null);
+    }
+
+    @Override
+    public DeserializationFormat 
createDeserializationFormat(TableFactoryContext context) {
+        Map<String, String> options = context.getOptions();
+        boolean ignoreParseErrors = 
CanalJsonFormatOptions.getIgnoreParseErrors(options);
+        String  databaseInclude = 
CanalJsonFormatOptions.getDatabaseInclude(options);
+        String  tableInclude = CanalJsonFormatOptions.getTableInclude(options);
+
+        // TODO config SeaTunnelRowType
+        return () -> new CanalJsonDeserializationSchema(null, databaseInclude, 
tableInclude, ignoreParseErrors);

Review Comment:
   lambda cannot serialize and deserialize



##########
docs/en/connector-v2/formats/canal-json.md:
##########
@@ -0,0 +1,109 @@
+# Canal Format
+
+Changelog-Data-Capture Format Format: Serialization Schema Format: 
Deserialization Schema
+
+Canal is a CDC (Changelog Data Capture) tool that can stream changes in 
real-time from MySQL into other systems. Canal provides a unified format schema 
for changelog and supports to serialize messages using JSON and protobuf 
(protobuf is the default format for Canal).
+
+Seatunnel supports to interpret Canal JSON messages as INSERT/UPDATE/DELETE 
messages into seatunnel system. This is useful in many cases to leverage this 
feature, such as
+
+    synchronizing incremental data from databases to other systems
+    auditing logs
+    real-time materialized views on databases
+    temporal join changing history of a database table and so on.
+
+Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in 
Seatunnel as Canal JSON messages, and emit to storage like Kafka. However, 
currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single 
UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as 
DELETE and INSERT Canal messages.
+
+# How to use Canal format
+Canal provides a unified format for changelog, here is a simple example for an 
update operation captured from a MySQL products table:
+```bash
+{
+  "data": [
+    {
+      "id": "111",
+      "name": "scooter",
+      "description": "Big 2-wheel scooter",
+      "weight": "5.18"
+    }
+  ],
+  "database": "inventory",
+  "es": 1589373560000,
+  "id": 9,
+  "isDdl": false,
+  "mysqlType": {
+    "id": "INTEGER",
+    "name": "VARCHAR(255)",
+    "description": "VARCHAR(512)",
+    "weight": "FLOAT"
+  },
+  "old": [
+    {
+      "weight": "5.15"
+    }
+  ],
+  "pkNames": [
+    "id"
+  ],
+  "sql": "",
+  "sqlType": {
+    "id": 4,
+    "name": 12,
+    "description": 12,
+    "weight": 7
+  },
+  "table": "products",
+  "ts": 1589373560798,
+  "type": "UPDATE"
+}
+```
+Note: please refer to Canal documentation about the meaning of each fields.
+
+The MySQL products table has 4 columns (id, name, description and weight). 
+The above JSON message is an update change event on the products table where 
the weight value of the row with id = 111 is changed from 5.18 to 5.15. 
+Assuming the messages have been synchronized to Kafka topic products_binlog, 
then we can use the following Seatunnel to consume this topic and interpret the 
change events.
+
+```bash
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test-canal-source"
+    result_table_name = "kafka_name"
+    start_mode = earliest
+    schema = {
+      fields {
+           id = "int"
+           name = "string"
+           description = "string"
+           weight = "string"
+      }
+    },
+    format = canal-json
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "localhost:9092"
+    topic = "test-canal-sink"
+  }
+}
+```
+
+# Format Options

Review Comment:
   move to top



##########
docs/en/connector-v2/formats/canal-json.md:
##########
@@ -0,0 +1,109 @@
+# Canal Format
+
+Changelog-Data-Capture Format Format: Serialization Schema Format: 
Deserialization Schema
+
+Canal is a CDC (Changelog Data Capture) tool that can stream changes in 
real-time from MySQL into other systems. Canal provides a unified format schema 
for changelog and supports to serialize messages using JSON and protobuf 
(protobuf is the default format for Canal).
+
+Seatunnel supports to interpret Canal JSON messages as INSERT/UPDATE/DELETE 
messages into seatunnel system. This is useful in many cases to leverage this 
feature, such as
+
+    synchronizing incremental data from databases to other systems
+    auditing logs
+    real-time materialized views on databases
+    temporal join changing history of a database table and so on.
+
+Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in 
Seatunnel as Canal JSON messages, and emit to storage like Kafka. However, 
currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single 
UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as 
DELETE and INSERT Canal messages.
+
+# How to use Canal format
+Canal provides a unified format for changelog, here is a simple example for an 
update operation captured from a MySQL products table:
+```bash
+{
+  "data": [
+    {
+      "id": "111",
+      "name": "scooter",
+      "description": "Big 2-wheel scooter",
+      "weight": "5.18"
+    }
+  ],
+  "database": "inventory",
+  "es": 1589373560000,
+  "id": 9,
+  "isDdl": false,
+  "mysqlType": {
+    "id": "INTEGER",
+    "name": "VARCHAR(255)",
+    "description": "VARCHAR(512)",
+    "weight": "FLOAT"
+  },
+  "old": [
+    {
+      "weight": "5.15"
+    }
+  ],
+  "pkNames": [
+    "id"
+  ],
+  "sql": "",
+  "sqlType": {
+    "id": 4,
+    "name": 12,
+    "description": 12,
+    "weight": 7
+  },
+  "table": "products",
+  "ts": 1589373560798,
+  "type": "UPDATE"
+}
+```
+Note: please refer to Canal documentation about the meaning of each fields.
+
+The MySQL products table has 4 columns (id, name, description and weight). 
+The above JSON message is an update change event on the products table where 
the weight value of the row with id = 111 is changed from 5.18 to 5.15. 
+Assuming the messages have been synchronized to Kafka topic products_binlog, 
then we can use the following Seatunnel to consume this topic and interpret the 
change events.
+
+```bash
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test-canal-source"
+    result_table_name = "kafka_name"
+    start_mode = earliest
+    schema = {
+      fields {
+           id = "int"
+           name = "string"
+           description = "string"
+           weight = "string"
+      }
+    },
+    format = canal-json
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource

Review Comment:
   remove
   



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+public class CannalToKafakIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CannalToKafakIT.class);
+
+    private static GenericContainer<?> CANAL_CONTAINER;
+
+    private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6";
+
+    private static final String CANAL_HOST = "canal_e2e";
+
+    private static final int CANAL_PORT = 11111;
+
+    
//----------------------------------------------------------------------------
+    // kafka
+    private static final String KAFKA_IMAGE_NAME = 
"confluentinc/cp-kafka:latest";
+
+    private static final String KAFKA_TOPIC = "test-canal-sink";
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    
//----------------------------------------------------------------------------
+    // mysql
+    private static final String MYSQL_HOST = "mysql_e2e";
+
+    private static final int MYSQL_PORT = 3306;
+
+    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+        new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer = new MySqlContainer(version)
+            .withConfigurationOverride("docker/server-gtids/my.cnf")
+            .withSetupSQL("docker/setup.sql")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MYSQL_HOST)
+            .withDatabaseName("canal")
+            .withUsername("st_user")
+            .withPassword("seatunnel")
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+        
mySqlContainer.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", MYSQL_PORT, MYSQL_PORT)));
+        return mySqlContainer;
+    }
+
+    private void createCanalContainer() {
+        CANAL_CONTAINER = new GenericContainer<>(CANAL_DOCKER_IMAGE)
+            
.withCopyFileToContainer(MountableFile.forClasspathResource("canal/canal.properties"),
 "/app/server/conf/canal.properties")
+            
.withCopyFileToContainer(MountableFile.forClasspathResource("canal/instance.properties"),
 "/app/server/conf/example/instance.properties")
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CANAL_HOST)
+            .withCommand()
+            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+        
CANAL_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", CANAL_PORT, CANAL_PORT)));
+    }
+
+    private void createKafkaContainer(){
+        KAFKA_CONTAINER = new 
KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+            .withNetwork(NETWORK)
+            .withNetworkAliases(KAFKA_HOST)
+            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+        
KAFKA_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList(
+            String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+
+        LOG.info("The third stage: Starting Kafka containers...");
+        createKafkaContainer();
+        Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        LOG.info("The first stage: Starting Canal containers...");
+        createCanalContainer();
+        Startables.deepStart(Stream.of(CANAL_CONTAINER)).join();
+        LOG.info("Containers are started");
+
+        inventoryDatabase.createAndInitialize();
+
+        Awaitility.given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initKafkaConsumer);
+    }
+
+    @TestTemplate
+    public void testCannalToKafakCannalFormatAnalysis(TestContainer container) 
throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/kafkasource_canal_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        ArrayList<Object> result = new ArrayList<>();
+        List<String> expectedResult =
+                Arrays.asList(
+                        
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car 
battery\",\"description\":\"12V car 
battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill 
bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to 
#3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's 
hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's 
hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":\"1.0\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}",
+                        
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent 
black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}"
+                        );
+
+        ArrayList<String> topics = new ArrayList<>();
+        topics.add(KAFKA_TOPIC);
+        kafkaConsumer.subscribe(topics);
+        while (result.size() < 9) {

Review Comment:
   Infinite loop



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java:
##########
@@ -51,22 +49,12 @@ public DeserializationFormat 
createDeserializationFormat(TableFactoryContext con
         boolean ignoreParseErrors = 
JsonFormatOptions.getIgnoreParseErrors(options);
 
         // TODO config SeaTunnelRowType
-        return new DeserializationFormat() {
-            @Override
-            public DeserializationSchema createDeserializationSchema() {
-                return new JsonDeserializationSchema(failOnMissingField, 
ignoreParseErrors, null);
-            }
-        };
+        return () -> new JsonDeserializationSchema(failOnMissingField, 
ignoreParseErrors, null);

Review Comment:
   lambda cannot serialize and deserialize



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to