This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 514bf8467 [cdc] refactor mysql cdc e2e test to reuse codes (#1604)
514bf8467 is described below

commit 514bf84674b14fe0d6ceaadf716bfb9715cae7d4
Author: legendtkl <[email protected]>
AuthorDate: Thu Jul 20 13:24:38 2023 +0800

    [cdc] refactor mysql cdc e2e test to reuse codes (#1604)
---
 .../paimon/tests/cdc/MySqlCdcE2eTestBase.java      | 259 ++++++++++-----------
 .../tests/cdc/MySqlComputedColumnE2ETest.java      |  55 +----
 .../tests/cdc/MySqlTinyIntConvertE2ETest.java      |  61 ++---
 3 files changed, 150 insertions(+), 225 deletions(-)

diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
index c464f1fcf..aa4c83c94 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -22,6 +22,9 @@ import 
org.apache.paimon.flink.action.cdc.mysql.MySqlContainer;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
 import org.apache.paimon.tests.E2eTestBase;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.hadoop.shaded.org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -31,10 +34,15 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 
+import javax.annotation.Nonnull;
+
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -48,6 +56,10 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
     private static final String USER = "paimonuser";
     private static final String PASSWORD = "paimonpw";
 
+    protected static final String ACTION_SYNC_TABLE = "mysql-sync-table";
+
+    protected static final String ACTION_SYNC_DATABASE = "mysql-sync-database";
+
     private final MySqlVersion mySqlVersion;
     protected MySqlContainer mySqlContainer;
 
@@ -98,44 +110,14 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
 
     @Test
     public void testSyncTable() throws Exception {
-        String runActionCommand =
-                String.join(
-                        " ",
-                        "bin/flink",
-                        "run",
-                        "-D",
-                        "execution.checkpointing.interval=1s",
-                        "--detached",
-                        "lib/paimon-flink-action.jar",
-                        "mysql-sync-table",
-                        "--warehouse",
-                        warehousePath,
-                        "--database",
-                        "default",
-                        "--table",
-                        "ts_table",
-                        "--partition-keys",
-                        "pt",
-                        "--primary-keys",
-                        "pt,_id",
-                        "--mysql-conf",
-                        "hostname=mysql-1",
-                        "--mysql-conf",
-                        String.format("port=%d", MySqlContainer.MYSQL_PORT),
-                        "--mysql-conf",
-                        String.format("username='%s'", 
mySqlContainer.getUsername()),
-                        "--mysql-conf",
-                        String.format("password='%s'", 
mySqlContainer.getPassword()),
-                        "--mysql-conf",
-                        "database-name='paimon_sync_table'",
-                        "--mysql-conf",
-                        "table-name='schema_evolution_.+'",
-                        "--table-conf",
-                        "bucket=2");
-        Container.ExecResult execResult =
-                jobManager.execInContainer("su", "flink", "-c", 
runActionCommand);
-        LOG.info(execResult.getStdout());
-        LOG.info(execResult.getStderr());
+        runAction(
+                ACTION_SYNC_TABLE,
+                "pt",
+                "pt,_id",
+                ImmutableMap.of(),
+                ImmutableMap.of(
+                        "database-name", "paimon_sync_table", "table-name", 
"schema_evolution_.+"),
+                ImmutableMap.of("bucket", "2"));
 
         try (Connection conn = getMySqlConnection();
                 Statement statement = conn.createStatement()) {
@@ -214,33 +196,14 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
 
     @Test
     public void testSyncDatabase() throws Exception {
-        String runActionCommand =
-                String.join(
-                        " ",
-                        "bin/flink",
-                        "run",
-                        "-D",
-                        "execution.checkpointing.interval=1s",
-                        "--detached",
-                        "lib/paimon-flink-action.jar",
-                        "mysql-sync-database",
-                        "--warehouse",
-                        warehousePath,
-                        "--database",
-                        "default",
-                        "--mysql-conf",
-                        "hostname=mysql-1",
-                        "--mysql-conf",
-                        String.format("port=%d", MySqlContainer.MYSQL_PORT),
-                        "--mysql-conf",
-                        String.format("username='%s'", 
mySqlContainer.getUsername()),
-                        "--mysql-conf",
-                        String.format("password='%s'", 
mySqlContainer.getPassword()),
-                        "--mysql-conf",
-                        "database-name='paimon_sync_database'",
-                        "--table-conf",
-                        "bucket=2");
-        jobManager.execInContainer("su", "flink", "-c", runActionCommand);
+
+        runAction(
+                ACTION_SYNC_DATABASE,
+                null,
+                null,
+                ImmutableMap.of(),
+                ImmutableMap.of("database-name", "paimon_sync_database"),
+                ImmutableMap.of("bucket", "2"));
 
         try (Connection conn = getMySqlConnection();
                 Statement statement = conn.createStatement()) {
@@ -306,46 +269,19 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
 
     @Test
     public void testSyncTableWithTinyConvert() throws Exception {
-        String runActionCommand =
-                String.join(
-                        " ",
-                        "bin/flink",
-                        "run",
-                        "-D",
-                        "execution.checkpointing.interval=1s",
-                        "--detached",
-                        "lib/paimon-flink-action.jar",
-                        "mysql-sync-table",
-                        "--warehouse",
-                        warehousePath,
-                        "--database",
-                        "default",
-                        "--table",
-                        "ts_table",
-                        "--partition-keys",
-                        "pt",
-                        "--primary-keys",
-                        "pt,_id",
-                        "--mysql-conf",
-                        "hostname=mysql-1",
-                        "--mysql-conf",
-                        String.format("port=%d", MySqlContainer.MYSQL_PORT),
-                        "--mysql-conf",
-                        String.format("username='%s'", 
mySqlContainer.getUsername()),
-                        "--mysql-conf",
-                        String.format("password='%s'", 
mySqlContainer.getPassword()),
-                        "--mysql-conf",
-                        "database-name='paimon_sync_table'",
-                        "--mysql-conf",
-                        "table-name='tinyint_schema_evolution_.+'",
-                        "--mysql-conf",
-                        "mysql.converter.tinyint1-to-bool='false'",
-                        "--table-conf",
-                        "bucket=2");
-        Container.ExecResult execResult =
-                jobManager.execInContainer("su", "flink", "-c", 
runActionCommand);
-        LOG.info(execResult.getStdout());
-        LOG.info(execResult.getStderr());
+        runAction(
+                ACTION_SYNC_TABLE,
+                "pt",
+                "pt,_id",
+                ImmutableMap.of(),
+                ImmutableMap.of(
+                        "database-name",
+                        "paimon_sync_table",
+                        "table-name",
+                        "tinyint_schema_evolution_.+",
+                        "mysql.converter.tinyint1-to-bool",
+                        "false"),
+                ImmutableMap.of("bucket", "2"));
 
         try (Connection conn = getMySqlConnection();
                 Statement statement = conn.createStatement()) {
@@ -398,35 +334,17 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
 
     @Test
     public void testSyncDatabaseWithTinyConvert() throws Exception {
-        String runActionCommand =
-                String.join(
-                        " ",
-                        "bin/flink",
-                        "run",
-                        "-D",
-                        "execution.checkpointing.interval=1s",
-                        "--detached",
-                        "lib/paimon-flink-action.jar",
-                        "mysql-sync-database",
-                        "--warehouse",
-                        warehousePath,
-                        "--database",
-                        "default",
-                        "--mysql-conf",
-                        "hostname=mysql-1",
-                        "--mysql-conf",
-                        String.format("port=%d", MySqlContainer.MYSQL_PORT),
-                        "--mysql-conf",
-                        String.format("username='%s'", 
mySqlContainer.getUsername()),
-                        "--mysql-conf",
-                        String.format("password='%s'", 
mySqlContainer.getPassword()),
-                        "--mysql-conf",
-                        "database-name='paimon_sync_database_tinyint'",
-                        "--mysql-conf",
-                        "mysql.converter.tinyint1-to-bool='false'",
-                        "--table-conf",
-                        "bucket=2");
-        jobManager.execInContainer("su", "flink", "-c", runActionCommand);
+        runAction(
+                ACTION_SYNC_DATABASE,
+                null,
+                null,
+                ImmutableMap.of(),
+                ImmutableMap.of(
+                        "database-name",
+                        "paimon_sync_database_tinyint",
+                        "mysql.converter.tinyint1-to-bool",
+                        "false"),
+                ImmutableMap.of("bucket", "2"));
 
         try (Connection conn = getMySqlConnection();
                 Statement statement = conn.createStatement()) {
@@ -506,4 +424,77 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
     protected void cancelJob(String jobId) throws Exception {
         jobManager.execInContainer("bin/flink", "cancel", jobId);
     }
+
+    protected void runAction(
+            @Nonnull String action,
+            String partitionKeys,
+            String primaryKeys,
+            @Nonnull Map<String, String> computedColumn,
+            @Nonnull Map<String, String> mysqlConf,
+            @Nonnull Map<String, String> tableConf)
+            throws Exception {
+
+        String partitionKeysStr =
+                StringUtils.isBlank(partitionKeys) ? "" : "--partition-keys " 
+ partitionKeys;
+        String primaryKeysStr =
+                StringUtils.isBlank(primaryKeys) ? "" : "--primary-keys " + 
primaryKeys;
+        String tableStr = action.equals(ACTION_SYNC_TABLE) ? "--table 
ts_table" : "";
+
+        List<String> computedColumns =
+                computedColumn.keySet().stream()
+                        .map(key -> String.format("%s=%s", key, 
computedColumn.get(key)))
+                        .flatMap(s -> Stream.of("--computed-column", s))
+                        .collect(Collectors.toList());
+
+        List<String> mysqlConfs =
+                mysqlConf.keySet().stream()
+                        .map(key -> String.format("%s=%s", key, 
mysqlConf.get(key)))
+                        .flatMap(s -> Stream.of("--mysql-conf", s))
+                        .collect(Collectors.toList());
+
+        List<String> tableConfs =
+                tableConf.keySet().stream()
+                        .map(key -> String.format("%s=%s", key, 
tableConf.get(key)))
+                        .flatMap(s -> Stream.of("--table-conf", s))
+                        .collect(Collectors.toList());
+
+        String runActionCommand =
+                String.join(
+                        " ",
+                        "bin/flink",
+                        "run",
+                        "-D",
+                        "execution.checkpointing.interval=1s",
+                        "--detached",
+                        "lib/paimon-flink-action.jar",
+                        action,
+                        "--warehouse",
+                        warehousePath,
+                        "--database",
+                        "default",
+                        tableStr,
+                        partitionKeysStr,
+                        primaryKeysStr,
+                        "--mysql-conf",
+                        "hostname=mysql-1",
+                        "--mysql-conf",
+                        String.format("port=%d", MySqlContainer.MYSQL_PORT),
+                        "--mysql-conf",
+                        String.format("username='%s'", 
mySqlContainer.getUsername()),
+                        "--mysql-conf",
+                        String.format("password='%s'", 
mySqlContainer.getPassword()));
+
+        runActionCommand +=
+                " "
+                        + String.join(" ", computedColumns)
+                        + " "
+                        + String.join(" ", mysqlConfs)
+                        + " "
+                        + String.join(" ", tableConfs);
+
+        Container.ExecResult execResult =
+                jobManager.execInContainer("su", "flink", "-c", 
runActionCommand);
+        LOG.info(execResult.getStdout());
+        LOG.info(execResult.getStderr());
+    }
 }
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
index 54b962a10..1f4dc3f27 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
@@ -18,14 +18,12 @@
 
 package org.apache.paimon.tests.cdc;
 
-import org.apache.paimon.flink.action.cdc.mysql.MySqlContainer;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
 
 import java.sql.Connection;
 import java.sql.Statement;
@@ -33,54 +31,19 @@ import java.sql.Statement;
 /** E2e test for MySql CDC with computed column. */
 public class MySqlComputedColumnE2ETest extends MySqlCdcE2eTestBase {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlComputedColumnE2ETest.class);
-
     protected MySqlComputedColumnE2ETest() {
         super(MySqlVersion.V5_7);
     }
 
     @Test
     public void testSyncTable() throws Exception {
-        String runActionCommand =
-                String.join(
-                        " ",
-                        "bin/flink",
-                        "run",
-                        "-D",
-                        "execution.checkpointing.interval=1s",
-                        "--detached",
-                        "lib/paimon-flink-action.jar",
-                        "mysql-sync-table",
-                        "--warehouse",
-                        warehousePath,
-                        "--database",
-                        "default",
-                        "--table",
-                        "ts_table",
-                        "--partition-keys",
-                        // computed from _datetime
-                        "_year",
-                        "--primary-keys",
-                        "pk,_year",
-                        "--computed-column '_year=year(_datetime)'",
-                        "--mysql-conf",
-                        "hostname=mysql-1",
-                        "--mysql-conf",
-                        String.format("port=%d", MySqlContainer.MYSQL_PORT),
-                        "--mysql-conf",
-                        String.format("username='%s'", 
mySqlContainer.getUsername()),
-                        "--mysql-conf",
-                        String.format("password='%s'", 
mySqlContainer.getPassword()),
-                        "--mysql-conf",
-                        "database-name='test_computed_column'",
-                        "--mysql-conf",
-                        "table-name='T'",
-                        "--table-conf",
-                        "bucket=2");
-        Container.ExecResult execResult =
-                jobManager.execInContainer("su", "flink", "-c", 
runActionCommand);
-        LOG.info(execResult.getStdout());
-        LOG.info(execResult.getStderr());
+        runAction(
+                ACTION_SYNC_TABLE,
+                "_year",
+                "pk,_year",
+                ImmutableMap.of("_year", "'year(_datetime)'"),
+                ImmutableMap.of("database-name", "'test_computed_column'", 
"table-name", "'T'"),
+                ImmutableMap.of("bucket", "2"));
 
         try (Connection conn = getMySqlConnection();
                 Statement statement = conn.createStatement()) {
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
index 3ff120293..489de4c21 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
@@ -18,14 +18,12 @@
 
 package org.apache.paimon.tests.cdc;
 
-import org.apache.paimon.flink.action.cdc.mysql.MySqlContainer;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
 
 import java.sql.Connection;
 import java.sql.Statement;
@@ -33,52 +31,25 @@ import java.sql.Statement;
 /** E2e test for MySql CDC type convert tinyint(1) to tinyint. */
 public class MySqlTinyIntConvertE2ETest extends MySqlCdcE2eTestBase {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlTinyIntConvertE2ETest.class);
-
     protected MySqlTinyIntConvertE2ETest() {
         super(MySqlVersion.V5_7);
     }
 
     @Test
     public void testSyncTable() throws Exception {
-        String runActionCommand =
-                String.join(
-                        " ",
-                        "bin/flink",
-                        "run",
-                        "-D",
-                        "execution.checkpointing.interval=1s",
-                        "--detached",
-                        "lib/paimon-flink-action.jar",
-                        "mysql-sync-table",
-                        "--warehouse",
-                        warehousePath,
-                        "--database",
-                        "default",
-                        "--table",
-                        "tinyint_table",
-                        "--primary-keys",
-                        "pk",
-                        "--mysql-conf",
-                        "hostname=mysql-1",
-                        "--mysql-conf",
-                        String.format("port=%d", MySqlContainer.MYSQL_PORT),
-                        "--mysql-conf",
-                        String.format("username='%s'", 
mySqlContainer.getUsername()),
-                        "--mysql-conf",
-                        String.format("password='%s'", 
mySqlContainer.getPassword()),
-                        "--mysql-conf",
-                        "database-name='test_tinyint_convert'",
-                        "--mysql-conf",
-                        "table-name='T'",
-                        "--mysql-conf",
-                        "mysql.converter.tinyint1-to-bool='false'",
-                        "--table-conf",
-                        "bucket=2");
-        Container.ExecResult execResult =
-                jobManager.execInContainer("su", "flink", "-c", 
runActionCommand);
-        LOG.info(execResult.getStdout());
-        LOG.info(execResult.getStderr());
+        runAction(
+                ACTION_SYNC_TABLE,
+                null,
+                "pk",
+                ImmutableMap.of(),
+                ImmutableMap.of(
+                        "database-name",
+                        "test_tinyint_convert",
+                        "table-name",
+                        "'T'",
+                        "mysql.converter.tinyint1-to-bool",
+                        "false"),
+                ImmutableMap.of("bucket", "2"));
 
         try (Connection conn = getMySqlConnection();
                 Statement statement = conn.createStatement()) {
@@ -88,7 +59,7 @@ public class MySqlTinyIntConvertE2ETest extends 
MySqlCdcE2eTestBase {
 
             String jobId =
                     runSql(
-                            "INSERT INTO result1 SELECT * FROM tinyint_table",
+                            "INSERT INTO result1 SELECT * FROM ts_table",
                             catalogDdl,
                             useCatalogCmd,
                             createResultSink(

Reply via email to