hailin0 commented on code in PR #4225: URL: https://github.com/apache/seatunnel/pull/4225#discussion_r1366424668
########## seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java: ########## @@ -0,0 +1,731 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.kafka; + +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; + +import com.google.common.collect.Lists; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.given; + +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK}, + disabledReason = "Spark engine will lose the row kind of record") +public class KafkaFormatIT extends TestSuiteBase implements TestResource { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaFormatIT.class); + + // ---------------------------Ogg Format Parameter--------------------------------------- + private static final String OGG_DATA_PATH = "/ogg/ogg_data.txt"; + private static final String OGG_KAFKA_SOURCE_TOPIC = "test-ogg-source"; + private static final String OGG_KAFKA_SINK_TOPIC = "test-ogg-sink"; + + // ---------------------------Canal Format Parameter--------------------------------------- + + private static final String CANAL_KAFKA_SINK_TOPIC = "test-canal-sink"; + private static final String CANAL_MYSQL_DATABASE = "canal"; + + // ---------------------------Compatible Format Parameter--------------------------------------- + private static final String COMPATIBLE_DATA_PATH = "/compatible/compatible_data.txt"; + private static final String COMPATIBLE_KAFKA_SINK_TOPIC = "jdbc_source_record"; + + // Used to map local data paths to kafa topics that need to be written to kafka + private static LinkedHashMap<String, String> LOCAL_DATA_TO_KAFKA_MAPPING; + + static { + LOCAL_DATA_TO_KAFKA_MAPPING = + new LinkedHashMap<String, String>() { + { + put(OGG_DATA_PATH, OGG_KAFKA_SOURCE_TOPIC); + put(COMPATIBLE_DATA_PATH, COMPATIBLE_KAFKA_SINK_TOPIC); + } + }; + } + + // ---------------------------Canal Container--------------------------------------- + private static GenericContainer<?> CANAL_CONTAINER; + + private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6"; + + private static final String CANAL_HOST = "canal_e2e"; + + // ---------------------------Debezium Container--------------------------------------- + + private static GenericContainer<?> DEBEZIUM_CONTAINER; + + private static final String DEBEZIUM_DOCKER_IMAGE = "quay.io/debezium/connect:2.3.0.Final"; + + private static final String DEBEZIUM_HOST = "debezium_e2e"; + + private static final int DEBEZIUM_PORT = 8083; + private static final String DEBEZIUM_KAFKA_TOPIC = "test-debezium-sink"; + private static final String DEBEZIUM_MYSQL_DATABASE = "debezium"; + + // ---------------------------Kafka Container--------------------------------------- + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9"; + + private static final String KAFKA_HOST = "kafka_e2e"; + + private static KafkaContainer KAFKA_CONTAINER; + + private KafkaConsumer<String, String> kafkaConsumer; + + // ---------------------------Mysql Container--------------------------------------- + private static final String MYSQL_HOST = "mysql_e2e"; + private static final String MYSQL_USER_NAME = "st_user"; + private static final String MYSQL_PASSWORD = "seatunnel"; + private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(); + + private final UniqueDatabase inventoryDatabase = + new UniqueDatabase( + MYSQL_CONTAINER, + CANAL_MYSQL_DATABASE, + "mysqluser", + "mysqlpw", + "initialize_format"); + + // --------------------------- Postgres Container------------------------------------- + private static final String PG_IMAGE = "postgres:alpine3.16"; + + private static final String PG_DRIVER_JAR = + "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar"; + + private static PostgreSQLContainer<?> POSTGRESQL_CONTAINER; + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + + PG_DRIVER_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode()); + }; + + private static MySqlContainer createMySqlContainer() { + return new MySqlContainer(MySqlVersion.V8_0) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withNetwork(NETWORK) + .withNetworkAliases(MYSQL_HOST) + .withDatabaseName(CANAL_MYSQL_DATABASE) + .withDatabaseName(DEBEZIUM_MYSQL_DATABASE) + .withUsername(MYSQL_USER_NAME) + .withPassword(MYSQL_PASSWORD) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } + + private void createCanalContainer() { + CANAL_CONTAINER = + new GenericContainer<>(CANAL_DOCKER_IMAGE) + .withCopyFileToContainer( + MountableFile.forClasspathResource("canal/canal.properties"), + "/app/server/conf/canal.properties") + .withCopyFileToContainer( + MountableFile.forClasspathResource("canal/instance.properties"), + "/app/server/conf/example/instance.properties") + .withNetwork(NETWORK) + .withNetworkAliases(CANAL_HOST) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE))); + } + + private void createDebeziumContainer() { + DEBEZIUM_CONTAINER = + new GenericContainer<>(DEBEZIUM_DOCKER_IMAGE) + .withCopyFileToContainer( + MountableFile.forClasspathResource("/debezium/register-mysql.json"), + "/tmp/seatunnel/plugins/Jdbc/register-mysql.json") + .withNetwork(NETWORK) + .withNetworkAliases(DEBEZIUM_HOST) + .withExposedPorts(DEBEZIUM_PORT) + .withEnv("GROUP_ID", "1") + .withEnv("CONFIG_STORAGE_TOPIC", "my-connect-configs") + .withEnv("OFFSET_STORAGE_TOPIC", "my-connect-offsets") + .withEnv("STATUS_STORAGE_TOPIC", "my-connect-status") + .withEnv("BOOTSTRAP_SERVERS", KAFKA_HOST + ":9092") + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(DEBEZIUM_DOCKER_IMAGE))) + .dependsOn(KAFKA_CONTAINER, MYSQL_CONTAINER); + DEBEZIUM_CONTAINER.setWaitStrategy( + (new HttpWaitStrategy()) + .forPath("/connectors") + .forPort(DEBEZIUM_PORT) + .withStartupTimeout(Duration.ofSeconds(120))); + DEBEZIUM_CONTAINER.setPortBindings( + com.google.common.collect.Lists.newArrayList( + String.format("%s:%s", DEBEZIUM_PORT, DEBEZIUM_PORT))); + } + + private void createKafkaContainer() { + KAFKA_CONTAINER = + new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); + } + + private void createPostgreSQLContainer() { + POSTGRESQL_CONTAINER = + new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE)) + .withNetwork(NETWORK) + .withNetworkAliases("postgresql") + .withExposedPorts(5432) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE))); + } + + @BeforeAll + @Override + public void startUp() throws ClassNotFoundException, InterruptedException, IOException { Review Comment: This method contains too many unrelated container starts and may not be able to use ci parallelism -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
