zhilinli123 commented on code in PR #4225:
URL: https://github.com/apache/seatunnel/pull/4225#discussion_r1366416606


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java:
##########
@@ -0,0 +1,731 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import com.google.common.collect.Lists;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK},
+        disabledReason = "Spark engine will lose the row kind of record")
+public class KafkaFormatIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaFormatIT.class);
+
+    // ---------------------------Ogg Format 
Parameter---------------------------------------
+    private static final String OGG_DATA_PATH = "/ogg/ogg_data.txt";
+    private static final String OGG_KAFKA_SOURCE_TOPIC = "test-ogg-source";
+    private static final String OGG_KAFKA_SINK_TOPIC = "test-ogg-sink";
+
+    // ---------------------------Canal Format 
Parameter---------------------------------------
+
+    private static final String CANAL_KAFKA_SINK_TOPIC = "test-canal-sink";
+    private static final String CANAL_MYSQL_DATABASE = "canal";
+
+    // ---------------------------Compatible Format 
Parameter---------------------------------------
+    private static final String COMPATIBLE_DATA_PATH = 
"/compatible/compatible_data.txt";
+    private static final String COMPATIBLE_KAFKA_SINK_TOPIC = 
"jdbc_source_record";
+
+    // Used to map local data paths to kafa topics that need to be written to 
kafka
+    private static LinkedHashMap<String, String> LOCAL_DATA_TO_KAFKA_MAPPING;
+
+    static {
+        LOCAL_DATA_TO_KAFKA_MAPPING =
+                new LinkedHashMap<String, String>() {
+                    {
+                        put(OGG_DATA_PATH, OGG_KAFKA_SOURCE_TOPIC);
+                        put(COMPATIBLE_DATA_PATH, COMPATIBLE_KAFKA_SINK_TOPIC);
+                    }
+                };
+    }
+
+    // ---------------------------Canal 
Container---------------------------------------
+    private static GenericContainer<?> CANAL_CONTAINER;
+
+    private static final String CANAL_DOCKER_IMAGE = "chinayin/canal:1.1.6";
+
+    private static final String CANAL_HOST = "canal_e2e";
+
+    // ---------------------------Debezium 
Container---------------------------------------
+
+    private static GenericContainer<?> DEBEZIUM_CONTAINER;
+
+    private static final String DEBEZIUM_DOCKER_IMAGE = 
"quay.io/debezium/connect:2.3.0.Final";
+
+    private static final String DEBEZIUM_HOST = "debezium_e2e";
+
+    private static final int DEBEZIUM_PORT = 8083;
+    private static final String DEBEZIUM_KAFKA_TOPIC = "test-debezium-sink";
+    private static final String DEBEZIUM_MYSQL_DATABASE = "debezium";
+
+    // ---------------------------Kafka 
Container---------------------------------------
+    private static final String KAFKA_IMAGE_NAME = 
"confluentinc/cp-kafka:7.0.9";
+
+    private static final String KAFKA_HOST = "kafka_e2e";
+
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    // ---------------------------Mysql 
Container---------------------------------------
+    private static final String MYSQL_HOST = "mysql_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_PASSWORD = "seatunnel";
+    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer();
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(
+                    MYSQL_CONTAINER,
+                    CANAL_MYSQL_DATABASE,
+                    "mysqluser",
+                    "mysqlpw",
+                    "initialize_format");
+
+    // --------------------------- Postgres 
Container-------------------------------------
+    private static final String PG_IMAGE = "postgres:alpine3.16";
+
+    private static final String PG_DRIVER_JAR =
+            
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";;
+
+    private static PostgreSQLContainer<?> POSTGRESQL_CONTAINER;
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && 
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+                                        + PG_DRIVER_JAR);
+                Assertions.assertEquals(0, extraCommands.getExitCode());
+            };
+
+    private static MySqlContainer createMySqlContainer() {
+        return new MySqlContainer(MySqlVersion.V8_0)
+                .withConfigurationOverride("docker/server-gtids/my.cnf")
+                .withSetupSQL("docker/setup.sql")
+                .withNetwork(NETWORK)
+                .withNetworkAliases(MYSQL_HOST)
+                .withDatabaseName(CANAL_MYSQL_DATABASE)
+                .withDatabaseName(DEBEZIUM_MYSQL_DATABASE)
+                .withUsername(MYSQL_USER_NAME)
+                .withPassword(MYSQL_PASSWORD)
+                .withLogConsumer(new Slf4jLogConsumer(LOG));
+    }
+
+    private void createCanalContainer() {
+        CANAL_CONTAINER =
+                new GenericContainer<>(CANAL_DOCKER_IMAGE)
+                        .withCopyFileToContainer(
+                                
MountableFile.forClasspathResource("canal/canal.properties"),
+                                "/app/server/conf/canal.properties")
+                        .withCopyFileToContainer(
+                                
MountableFile.forClasspathResource("canal/instance.properties"),
+                                "/app/server/conf/example/instance.properties")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(CANAL_HOST)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+    }
+
+    private void createDebeziumContainer() {
+        DEBEZIUM_CONTAINER =
+                new GenericContainer<>(DEBEZIUM_DOCKER_IMAGE)
+                        .withCopyFileToContainer(
+                                
MountableFile.forClasspathResource("/debezium/register-mysql.json"),
+                                
"/tmp/seatunnel/plugins/Jdbc/register-mysql.json")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(DEBEZIUM_HOST)
+                        .withExposedPorts(DEBEZIUM_PORT)
+                        .withEnv("GROUP_ID", "1")
+                        .withEnv("CONFIG_STORAGE_TOPIC", "my-connect-configs")
+                        .withEnv("OFFSET_STORAGE_TOPIC", "my-connect-offsets")
+                        .withEnv("STATUS_STORAGE_TOPIC", "my-connect-status")
+                        .withEnv("BOOTSTRAP_SERVERS", KAFKA_HOST + ":9092")
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(DEBEZIUM_DOCKER_IMAGE)))
+                        .dependsOn(KAFKA_CONTAINER, MYSQL_CONTAINER);
+        DEBEZIUM_CONTAINER.setWaitStrategy(
+                (new HttpWaitStrategy())
+                        .forPath("/connectors")
+                        .forPort(DEBEZIUM_PORT)
+                        .withStartupTimeout(Duration.ofSeconds(120)));
+        DEBEZIUM_CONTAINER.setPortBindings(
+                com.google.common.collect.Lists.newArrayList(
+                        String.format("%s:%s", DEBEZIUM_PORT, DEBEZIUM_PORT)));
+    }
+
+    private void createKafkaContainer() {
+        KAFKA_CONTAINER =
+                new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(KAFKA_HOST)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+    }
+
+    private void createPostgreSQLContainer() {
+        POSTGRESQL_CONTAINER =
+                new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases("postgresql")
+                        .withExposedPorts(5432)
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws ClassNotFoundException, InterruptedException, 
IOException {
+
+        LOG.info("The first stage: Starting Kafka containers...");
+        createKafkaContainer();
+        Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+        LOG.info("Kafka Containers are started");
+
+        LOG.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Mysql Containers are started");
+
+        LOG.info("The third stage: Starting Debezium Connector containers...");
+        createDebeziumContainer();
+        Startables.deepStart(Stream.of(DEBEZIUM_CONTAINER)).join();
+        LOG.info("Debezium Containers are started");
+
+        LOG.info("The third stage: Starting Canal containers...");
+        createCanalContainer();
+        Startables.deepStart(Stream.of(CANAL_CONTAINER)).join();
+        LOG.info("Canal Containers are started");
+
+        LOG.info("The fourth stage: Starting PostgreSQL container...");
+        createPostgreSQLContainer();
+        Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join();
+        Class.forName(POSTGRESQL_CONTAINER.getDriverClassName());
+        LOG.info("postgresql Containers are started");
+
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(this::initializeJdbcTable);
+
+        given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initKafkaConsumer);
+
+        LOG.info("start init Mysql DDl...");
+        inventoryDatabase.createAndInitialize();
+        LOG.info("end init Mysql DDl...");
+
+        // local file ogg data send kafka
+        given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(3, TimeUnit.MINUTES)
+                .untilAsserted(this::initOggDataToKafka);
+
+        // debezium configuration information
+        Container.ExecResult extraCommand =
+                DEBEZIUM_CONTAINER.execInContainer(
+                        "bash",
+                        "-c",
+                        "cd /tmp/seatunnel/plugins/Jdbc && curl -i -X POST -H 
\"Accept:application/json\" -H  \"Content-Type:application/json\" http://";
+                                + getLinuxLocalIp()
+                                + ":8083/connectors/ -d @register-mysql.json");
+        Assertions.assertEquals(0, extraCommand.getExitCode());
+        // ensure debezium has handled the data
+        Thread.sleep(40 * 1000);
+        Awaitility.given()
+                .ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(3, TimeUnit.MINUTES)
+                .untilAsserted(this::updateDebeziumSourceTableData);
+    }
+
+    @TestTemplate
+    public void testFormatCheck(TestContainer container) throws IOException, 
InterruptedException {

Review Comment:
   I tested it and felt that I did not find a timeout, the same type of tasks 
are running on a container, I think to avoid the startup destruction process 
between different containers to save some time, if later found some timeouts I 
think we can increase the timeout setting,kafka e2e time can at least save more 
than 70% of the time 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to