hailin0 commented on code in PR #3950: URL: https://github.com/apache/incubator-seatunnel/pull/3950#discussion_r1093942584
########## seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.kafka; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +public class CannalToKafakIT extends TestSuiteBase implements TestResource { + + private static final Logger LOG = LoggerFactory.getLogger(CannalToKafakIT.class); + + private static GenericContainer<?> CANAL_CONTAINER; + + private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6"; + + private static final String CANAL_HOST = "canal_e2e"; + + private static final int CANAL_PORT = 11111; + + //---------------------------------------------------------------------------- + // kafka + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:latest"; + + private static final String KAFKA_TOPIC = "test-canal-sink"; + + private static final int KAFKA_PORT = 9093; + + private static final String KAFKA_HOST = "kafkaCluster"; + + private static KafkaContainer KAFKA_CONTAINER; + + private KafkaConsumer<String, String> kafkaConsumer; + + //---------------------------------------------------------------------------- + // mysql + private static final String MYSQL_HOST = "mysql_e2e"; + + private static final int MYSQL_PORT = 3306; + + private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + + private final UniqueDatabase inventoryDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw"); + + private static MySqlContainer createMySqlContainer(MySqlVersion version) { + MySqlContainer mySqlContainer = new MySqlContainer(version) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withNetwork(NETWORK) + .withNetworkAliases(MYSQL_HOST) + .withDatabaseName("canal") + .withUsername("st_user") + .withPassword("seatunnel") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + mySqlContainer.setPortBindings(com.google.common.collect.Lists.newArrayList( + String.format("%s:%s", MYSQL_PORT, MYSQL_PORT))); + return mySqlContainer; + } + + private void createCanalContainer() { + CANAL_CONTAINER = new GenericContainer<>(CANAL_DOCKER_IMAGE) + .withCopyFileToContainer(MountableFile.forClasspathResource("canal/canal.properties"), "/app/server/conf/canal.properties") + .withCopyFileToContainer(MountableFile.forClasspathResource("canal/instance.properties"), "/app/server/conf/example/instance.properties") + .withNetwork(NETWORK) + .withNetworkAliases(CANAL_HOST) + .withCommand() + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE))); + CANAL_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList( + String.format("%s:%s", CANAL_PORT, CANAL_PORT))); + } + + private void createKafkaContainer(){ + KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); + KAFKA_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList( + String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); + } + + @BeforeAll + @Override + public void startUp() { + + LOG.info("The third stage: Starting Kafka containers..."); + createKafkaContainer(); + Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join(); + LOG.info("Containers are started"); + + LOG.info("The first stage: Starting Mysql containers..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + LOG.info("Containers are started"); + + LOG.info("The first stage: Starting Canal containers..."); + createCanalContainer(); + Startables.deepStart(Stream.of(CANAL_CONTAINER)).join(); + LOG.info("Containers are started"); + + inventoryDatabase.createAndInitialize(); + + Awaitility.given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initKafkaConsumer); + } + + @TestTemplate + public void testCannalToKafakCannalFormatAnalysis(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/kafkasource_canal_to_kafka.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + ArrayList<Object> result = new ArrayList<>(); + List<String> expectedResult = + Arrays.asList( + "{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":\"1.0\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}" Review Comment: mv to file ########## seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java: ########## @@ -51,22 +49,12 @@ public DeserializationFormat createDeserializationFormat(TableFactoryContext con boolean ignoreParseErrors = JsonFormatOptions.getIgnoreParseErrors(options); // TODO config SeaTunnelRowType - return new DeserializationFormat() { - @Override - public DeserializationSchema createDeserializationSchema() { - return new JsonDeserializationSchema(failOnMissingField, ignoreParseErrors, null); - } - }; + return () -> new JsonDeserializationSchema(failOnMissingField, ignoreParseErrors, null); Review Comment: revert ########## seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.json.canal; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.table.connector.DeserializationFormat; +import org.apache.seatunnel.api.table.connector.SerializationFormat; +import org.apache.seatunnel.api.table.factory.DeserializationFormatFactory; +import org.apache.seatunnel.api.table.factory.SerializationFormatFactory; +import org.apache.seatunnel.api.table.factory.TableFactoryContext; + +import java.util.Map; + +/** + * Format factory for providing configured instances of Canal JSON to RowData {@link + * DeserializationSchema}. + */ + +public class CanalJsonFormatFactory + implements DeserializationFormatFactory, SerializationFormatFactory { + + public static final String IDENTIFIER = "canal-json"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public OptionRule optionRule() { + // TODO config option rules + return OptionRule.builder().build(); + } + + @Override + public SerializationFormat createSerializationFormat(TableFactoryContext context) { + return () -> new CanalJsonSerializationSchema(null); Review Comment: lambda cannot serialize and deserialize ########## seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java: ########## @@ -51,22 +49,12 @@ public DeserializationFormat createDeserializationFormat(TableFactoryContext con boolean ignoreParseErrors = JsonFormatOptions.getIgnoreParseErrors(options); // TODO config SeaTunnelRowType - return new DeserializationFormat() { - @Override - public DeserializationSchema createDeserializationSchema() { - return new JsonDeserializationSchema(failOnMissingField, ignoreParseErrors, null); - } - }; + return () -> new JsonDeserializationSchema(failOnMissingField, ignoreParseErrors, null); } @Override public SerializationFormat createSerializationFormat(TableFactoryContext context) { // TODO config SeaTunnelRowType - return new SerializationFormat() { - @Override - public SerializationSchema createSerializationSchema() { - return new JsonSerializationSchema(null); - } - }; + return () -> new JsonSerializationSchema(null); Review Comment: revert ########## seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.json.canal; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.table.connector.DeserializationFormat; +import org.apache.seatunnel.api.table.connector.SerializationFormat; +import org.apache.seatunnel.api.table.factory.DeserializationFormatFactory; +import org.apache.seatunnel.api.table.factory.SerializationFormatFactory; +import org.apache.seatunnel.api.table.factory.TableFactoryContext; + +import java.util.Map; + +/** + * Format factory for providing configured instances of Canal JSON to RowData {@link + * DeserializationSchema}. + */ + +public class CanalJsonFormatFactory + implements DeserializationFormatFactory, SerializationFormatFactory { + + public static final String IDENTIFIER = "canal-json"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public OptionRule optionRule() { + // TODO config option rules + return OptionRule.builder().build(); + } + + @Override + public SerializationFormat createSerializationFormat(TableFactoryContext context) { + return () -> new CanalJsonSerializationSchema(null); + } + + @Override + public DeserializationFormat createDeserializationFormat(TableFactoryContext context) { + Map<String, String> options = context.getOptions(); + boolean ignoreParseErrors = CanalJsonFormatOptions.getIgnoreParseErrors(options); + String databaseInclude = CanalJsonFormatOptions.getDatabaseInclude(options); + String tableInclude = CanalJsonFormatOptions.getTableInclude(options); + + // TODO config SeaTunnelRowType + return () -> new CanalJsonDeserializationSchema(null, databaseInclude, tableInclude, ignoreParseErrors); Review Comment: lambda cannot serialize and deserialize ########## docs/en/connector-v2/formats/canal-json.md: ########## @@ -0,0 +1,109 @@ +# Canal Format + +Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema + +Canal is a CDC (Changelog Data Capture) tool that can stream changes in real-time from MySQL into other systems. Canal provides a unified format schema for changelog and supports to serialize messages using JSON and protobuf (protobuf is the default format for Canal). + +Seatunnel supports to interpret Canal JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as + + synchronizing incremental data from databases to other systems + auditing logs + real-time materialized views on databases + temporal join changing history of a database table and so on. + +Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel as Canal JSON messages, and emit to storage like Kafka. However, currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Canal messages. + +# How to use Canal format +Canal provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table: +```bash +{ + "data": [ + { + "id": "111", + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": "5.18" + } + ], + "database": "inventory", + "es": 1589373560000, + "id": 9, + "isDdl": false, + "mysqlType": { + "id": "INTEGER", + "name": "VARCHAR(255)", + "description": "VARCHAR(512)", + "weight": "FLOAT" + }, + "old": [ + { + "weight": "5.15" + } + ], + "pkNames": [ + "id" + ], + "sql": "", + "sqlType": { + "id": 4, + "name": 12, + "description": 12, + "weight": 7 + }, + "table": "products", + "ts": 1589373560798, + "type": "UPDATE" +} +``` +Note: please refer to Canal documentation about the meaning of each fields. + +The MySQL products table has 4 columns (id, name, description and weight). +The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15. +Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel to consume this topic and interpret the change events. + +```bash +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9092" + topic = "test-canal-source" + result_table_name = "kafka_name" + start_mode = earliest + schema = { + fields { + id = "int" + name = "string" + description = "string" + weight = "string" + } + }, + format = canal-json + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { +} + +sink { + Kafka { + bootstrap.servers = "localhost:9092" + topic = "test-canal-sink" + } +} +``` + +# Format Options Review Comment: move to top ########## docs/en/connector-v2/formats/canal-json.md: ########## @@ -0,0 +1,109 @@ +# Canal Format + +Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema + +Canal is a CDC (Changelog Data Capture) tool that can stream changes in real-time from MySQL into other systems. Canal provides a unified format schema for changelog and supports to serialize messages using JSON and protobuf (protobuf is the default format for Canal). + +Seatunnel supports to interpret Canal JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as + + synchronizing incremental data from databases to other systems + auditing logs + real-time materialized views on databases + temporal join changing history of a database table and so on. + +Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel as Canal JSON messages, and emit to storage like Kafka. However, currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Canal messages. + +# How to use Canal format +Canal provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table: +```bash +{ + "data": [ + { + "id": "111", + "name": "scooter", + "description": "Big 2-wheel scooter", + "weight": "5.18" + } + ], + "database": "inventory", + "es": 1589373560000, + "id": 9, + "isDdl": false, + "mysqlType": { + "id": "INTEGER", + "name": "VARCHAR(255)", + "description": "VARCHAR(512)", + "weight": "FLOAT" + }, + "old": [ + { + "weight": "5.15" + } + ], + "pkNames": [ + "id" + ], + "sql": "", + "sqlType": { + "id": 4, + "name": 12, + "description": 12, + "weight": 7 + }, + "table": "products", + "ts": 1589373560798, + "type": "UPDATE" +} +``` +Note: please refer to Canal documentation about the meaning of each fields. + +The MySQL products table has 4 columns (id, name, description and weight). +The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15. +Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel to consume this topic and interpret the change events. + +```bash +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9092" + topic = "test-canal-source" + result_table_name = "kafka_name" + start_mode = earliest + schema = { + fields { + id = "int" + name = "string" + description = "string" + weight = "string" + } + }, + format = canal-json + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource Review Comment: remove ########## seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.kafka; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +public class CannalToKafakIT extends TestSuiteBase implements TestResource { + + private static final Logger LOG = LoggerFactory.getLogger(CannalToKafakIT.class); + + private static GenericContainer<?> CANAL_CONTAINER; + + private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6"; + + private static final String CANAL_HOST = "canal_e2e"; + + private static final int CANAL_PORT = 11111; + + //---------------------------------------------------------------------------- + // kafka + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:latest"; + + private static final String KAFKA_TOPIC = "test-canal-sink"; + + private static final int KAFKA_PORT = 9093; + + private static final String KAFKA_HOST = "kafkaCluster"; + + private static KafkaContainer KAFKA_CONTAINER; + + private KafkaConsumer<String, String> kafkaConsumer; + + //---------------------------------------------------------------------------- + // mysql + private static final String MYSQL_HOST = "mysql_e2e"; + + private static final int MYSQL_PORT = 3306; + + private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + + private final UniqueDatabase inventoryDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw"); + + private static MySqlContainer createMySqlContainer(MySqlVersion version) { + MySqlContainer mySqlContainer = new MySqlContainer(version) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withNetwork(NETWORK) + .withNetworkAliases(MYSQL_HOST) + .withDatabaseName("canal") + .withUsername("st_user") + .withPassword("seatunnel") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + mySqlContainer.setPortBindings(com.google.common.collect.Lists.newArrayList( + String.format("%s:%s", MYSQL_PORT, MYSQL_PORT))); + return mySqlContainer; + } + + private void createCanalContainer() { + CANAL_CONTAINER = new GenericContainer<>(CANAL_DOCKER_IMAGE) + .withCopyFileToContainer(MountableFile.forClasspathResource("canal/canal.properties"), "/app/server/conf/canal.properties") + .withCopyFileToContainer(MountableFile.forClasspathResource("canal/instance.properties"), "/app/server/conf/example/instance.properties") + .withNetwork(NETWORK) + .withNetworkAliases(CANAL_HOST) + .withCommand() + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE))); + CANAL_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList( + String.format("%s:%s", CANAL_PORT, CANAL_PORT))); + } + + private void createKafkaContainer(){ + KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); + KAFKA_CONTAINER.setPortBindings(com.google.common.collect.Lists.newArrayList( + String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); + } + + @BeforeAll + @Override + public void startUp() { + + LOG.info("The third stage: Starting Kafka containers..."); + createKafkaContainer(); + Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join(); + LOG.info("Containers are started"); + + LOG.info("The first stage: Starting Mysql containers..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + LOG.info("Containers are started"); + + LOG.info("The first stage: Starting Canal containers..."); + createCanalContainer(); + Startables.deepStart(Stream.of(CANAL_CONTAINER)).join(); + LOG.info("Containers are started"); + + inventoryDatabase.createAndInitialize(); + + Awaitility.given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initKafkaConsumer); + } + + @TestTemplate + public void testCannalToKafakCannalFormatAnalysis(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/kafkasource_canal_to_kafka.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + ArrayList<Object> result = new ArrayList<>(); + List<String> expectedResult = + Arrays.asList( + "{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":\"8.1\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":\"0.8\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":\"0.75\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":\"0.875\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":\"1.0\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":\"5.3\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":\"0.1\"},\"type\":\"INSERT\"}", + "{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":\"22.2\"},\"type\":\"INSERT\"}" + ); + + ArrayList<String> topics = new ArrayList<>(); + topics.add(KAFKA_TOPIC); + kafkaConsumer.subscribe(topics); + while (result.size() < 9) { Review Comment: Infinite loop ########## seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonFormatFactory.java: ########## @@ -51,22 +49,12 @@ public DeserializationFormat createDeserializationFormat(TableFactoryContext con boolean ignoreParseErrors = JsonFormatOptions.getIgnoreParseErrors(options); // TODO config SeaTunnelRowType - return new DeserializationFormat() { - @Override - public DeserializationSchema createDeserializationSchema() { - return new JsonDeserializationSchema(failOnMissingField, ignoreParseErrors, null); - } - }; + return () -> new JsonDeserializationSchema(failOnMissingField, ignoreParseErrors, null); Review Comment: lambda cannot serialize and deserialize -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
