hailin0 commented on code in PR #8082:
URL: https://github.com/apache/seatunnel/pull/8082#discussion_r1851214562


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksSchemaChangeIT.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.starrocks;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason =
+                "Currently SPARK do not support cdc. In addition, currently 
only the zeta engine supports schema evolution for pr 
https://github.com/apache/seatunnel/pull/5125.";)
+public class StarRocksSchemaChangeIT extends TestSuiteBase implements 
TestResource {
+    private static final String DATABASE = "shop";
+    private static final String SOURCE_TABLE = "products";
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "mysqluser";
+    private static final String MYSQL_USER_PASSWORD = "mysqlpw";
+
+    private static final String DOCKER_IMAGE = "starrocks/allin1-ubuntu:3.3.4";
+    private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+    private static final String HOST = "starrocks_cdc_e2e";
+    private static final int SR_PROXY_PORT = 8080;
+    private static final int QUERY_PORT = 9030;
+    private static final int HTTP_PORT = 8030;
+    private static final int BE_HTTP_PORT = 8040;
+    private static final String USERNAME = "root";
+    private static final String PASSWORD = "";
+    private static final String SINK_TABLE = "products";
+    private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT 
EXISTS " + DATABASE;
+    private static final String SR_DRIVER_JAR =
+            
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";;
+
+    private Connection starRocksConnection;
+    private Connection mysqlConnection;
+    private GenericContainer<?> starRocksServer;
+
+    public static final DateTimeFormatter DATE_TIME_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+    private static final String QUERY = "select * from %s.%s order by id";
+    private static final String QUERY_COLUMNS =
+            "SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE 
TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' ORDER by COLUMN_NAME;";
+    private static final String PROJECTION_QUERY =
+            "select 
id,name,description,weight,add_column1,add_column2,add_column3 from %s.%s order 
by id;";
+
+    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase shopDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, DATABASE, "mysqluser", 
"mysqlpw", DATABASE);
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && 
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+                                        + SR_DRIVER_JAR);
+                Assertions.assertEquals(0, extraCommands.getExitCode());
+            };
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        return new MySqlContainer(version)
+                .withConfigurationOverride("docker/server-gtids/my.cnf")
+                .withSetupSQL("docker/setup.sql")
+                .withNetwork(NETWORK)
+                .withNetworkAliases(MYSQL_HOST)
+                .withDatabaseName(DATABASE)
+                .withUsername(MYSQL_USER_NAME)
+                .withPassword(MYSQL_USER_PASSWORD)
+                .withLogConsumer(
+                        new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image")));
+    }
+
+    private void initializeJdbcConnection() throws Exception {
+        URLClassLoader urlClassLoader =
+                new URLClassLoader(
+                        new URL[] {new URL(SR_DRIVER_JAR)},
+                        StarRocksCDCSinkIT.class.getClassLoader());
+        Thread.currentThread().setContextClassLoader(urlClassLoader);
+        Driver driver = (Driver) 
urlClassLoader.loadClass(DRIVER_CLASS).newInstance();
+        Properties props = new Properties();
+        props.put("user", USERNAME);
+        props.put("password", PASSWORD);
+        starRocksConnection =
+                driver.connect(
+                        String.format("jdbc:mysql://%s:%s", 
starRocksServer.getHost(), QUERY_PORT),
+                        props);
+    }
+
+    private void initializeStarRocksServer() {
+        starRocksServer =
+                new GenericContainer<>(DOCKER_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(HOST)
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
+        starRocksServer.setPortBindings(
+                Lists.newArrayList(
+                        String.format("%s:%s", QUERY_PORT, QUERY_PORT),
+                        String.format("%s:%s", HTTP_PORT, HTTP_PORT),
+                        String.format("%s:%s", BE_HTTP_PORT, BE_HTTP_PORT)));
+        Startables.deepStart(Stream.of(starRocksServer)).join();
+        log.info("StarRocks container started");
+        // wait for starrocks fully start
+        given().ignoreExceptions()
+                .await()
+                .atMost(360, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeJdbcConnection);
+    }
+
+    @TestTemplate
+    public void testStarRocksSinkWithSchemaEvolutionCase(TestContainer 
container)

Review Comment:
   reference
   
   
   
https://github.com/apache/seatunnel/blob/69cd4ae1a21be5e54a5f722d7f125b4ff75b3367/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java#L119



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to