This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 514bf8467 [cdc] refactor mysql cdc e2e test to reuse codes (#1604)
514bf8467 is described below
commit 514bf84674b14fe0d6ceaadf716bfb9715cae7d4
Author: legendtkl <[email protected]>
AuthorDate: Thu Jul 20 13:24:38 2023 +0800
[cdc] refactor mysql cdc e2e test to reuse codes (#1604)
---
.../paimon/tests/cdc/MySqlCdcE2eTestBase.java | 259 ++++++++++-----------
.../tests/cdc/MySqlComputedColumnE2ETest.java | 55 +----
.../tests/cdc/MySqlTinyIntConvertE2ETest.java | 61 ++---
3 files changed, 150 insertions(+), 225 deletions(-)
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
index c464f1fcf..aa4c83c94 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -22,6 +22,9 @@ import
org.apache.paimon.flink.action.cdc.mysql.MySqlContainer;
import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
import org.apache.paimon.tests.E2eTestBase;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.hadoop.shaded.org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -31,10 +34,15 @@ import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
+import javax.annotation.Nonnull;
+
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -48,6 +56,10 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
private static final String USER = "paimonuser";
private static final String PASSWORD = "paimonpw";
+ protected static final String ACTION_SYNC_TABLE = "mysql-sync-table";
+
+ protected static final String ACTION_SYNC_DATABASE = "mysql-sync-database";
+
private final MySqlVersion mySqlVersion;
protected MySqlContainer mySqlContainer;
@@ -98,44 +110,14 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
@Test
public void testSyncTable() throws Exception {
- String runActionCommand =
- String.join(
- " ",
- "bin/flink",
- "run",
- "-D",
- "execution.checkpointing.interval=1s",
- "--detached",
- "lib/paimon-flink-action.jar",
- "mysql-sync-table",
- "--warehouse",
- warehousePath,
- "--database",
- "default",
- "--table",
- "ts_table",
- "--partition-keys",
- "pt",
- "--primary-keys",
- "pt,_id",
- "--mysql-conf",
- "hostname=mysql-1",
- "--mysql-conf",
- String.format("port=%d", MySqlContainer.MYSQL_PORT),
- "--mysql-conf",
- String.format("username='%s'",
mySqlContainer.getUsername()),
- "--mysql-conf",
- String.format("password='%s'",
mySqlContainer.getPassword()),
- "--mysql-conf",
- "database-name='paimon_sync_table'",
- "--mysql-conf",
- "table-name='schema_evolution_.+'",
- "--table-conf",
- "bucket=2");
- Container.ExecResult execResult =
- jobManager.execInContainer("su", "flink", "-c",
runActionCommand);
- LOG.info(execResult.getStdout());
- LOG.info(execResult.getStderr());
+ runAction(
+ ACTION_SYNC_TABLE,
+ "pt",
+ "pt,_id",
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ "database-name", "paimon_sync_table", "table-name",
"schema_evolution_.+"),
+ ImmutableMap.of("bucket", "2"));
try (Connection conn = getMySqlConnection();
Statement statement = conn.createStatement()) {
@@ -214,33 +196,14 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
@Test
public void testSyncDatabase() throws Exception {
- String runActionCommand =
- String.join(
- " ",
- "bin/flink",
- "run",
- "-D",
- "execution.checkpointing.interval=1s",
- "--detached",
- "lib/paimon-flink-action.jar",
- "mysql-sync-database",
- "--warehouse",
- warehousePath,
- "--database",
- "default",
- "--mysql-conf",
- "hostname=mysql-1",
- "--mysql-conf",
- String.format("port=%d", MySqlContainer.MYSQL_PORT),
- "--mysql-conf",
- String.format("username='%s'",
mySqlContainer.getUsername()),
- "--mysql-conf",
- String.format("password='%s'",
mySqlContainer.getPassword()),
- "--mysql-conf",
- "database-name='paimon_sync_database'",
- "--table-conf",
- "bucket=2");
- jobManager.execInContainer("su", "flink", "-c", runActionCommand);
+
+ runAction(
+ ACTION_SYNC_DATABASE,
+ null,
+ null,
+ ImmutableMap.of(),
+ ImmutableMap.of("database-name", "paimon_sync_database"),
+ ImmutableMap.of("bucket", "2"));
try (Connection conn = getMySqlConnection();
Statement statement = conn.createStatement()) {
@@ -306,46 +269,19 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
@Test
public void testSyncTableWithTinyConvert() throws Exception {
- String runActionCommand =
- String.join(
- " ",
- "bin/flink",
- "run",
- "-D",
- "execution.checkpointing.interval=1s",
- "--detached",
- "lib/paimon-flink-action.jar",
- "mysql-sync-table",
- "--warehouse",
- warehousePath,
- "--database",
- "default",
- "--table",
- "ts_table",
- "--partition-keys",
- "pt",
- "--primary-keys",
- "pt,_id",
- "--mysql-conf",
- "hostname=mysql-1",
- "--mysql-conf",
- String.format("port=%d", MySqlContainer.MYSQL_PORT),
- "--mysql-conf",
- String.format("username='%s'",
mySqlContainer.getUsername()),
- "--mysql-conf",
- String.format("password='%s'",
mySqlContainer.getPassword()),
- "--mysql-conf",
- "database-name='paimon_sync_table'",
- "--mysql-conf",
- "table-name='tinyint_schema_evolution_.+'",
- "--mysql-conf",
- "mysql.converter.tinyint1-to-bool='false'",
- "--table-conf",
- "bucket=2");
- Container.ExecResult execResult =
- jobManager.execInContainer("su", "flink", "-c",
runActionCommand);
- LOG.info(execResult.getStdout());
- LOG.info(execResult.getStderr());
+ runAction(
+ ACTION_SYNC_TABLE,
+ "pt",
+ "pt,_id",
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ "database-name",
+ "paimon_sync_table",
+ "table-name",
+ "tinyint_schema_evolution_.+",
+ "mysql.converter.tinyint1-to-bool",
+ "false"),
+ ImmutableMap.of("bucket", "2"));
try (Connection conn = getMySqlConnection();
Statement statement = conn.createStatement()) {
@@ -398,35 +334,17 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
@Test
public void testSyncDatabaseWithTinyConvert() throws Exception {
- String runActionCommand =
- String.join(
- " ",
- "bin/flink",
- "run",
- "-D",
- "execution.checkpointing.interval=1s",
- "--detached",
- "lib/paimon-flink-action.jar",
- "mysql-sync-database",
- "--warehouse",
- warehousePath,
- "--database",
- "default",
- "--mysql-conf",
- "hostname=mysql-1",
- "--mysql-conf",
- String.format("port=%d", MySqlContainer.MYSQL_PORT),
- "--mysql-conf",
- String.format("username='%s'",
mySqlContainer.getUsername()),
- "--mysql-conf",
- String.format("password='%s'",
mySqlContainer.getPassword()),
- "--mysql-conf",
- "database-name='paimon_sync_database_tinyint'",
- "--mysql-conf",
- "mysql.converter.tinyint1-to-bool='false'",
- "--table-conf",
- "bucket=2");
- jobManager.execInContainer("su", "flink", "-c", runActionCommand);
+ runAction(
+ ACTION_SYNC_DATABASE,
+ null,
+ null,
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ "database-name",
+ "paimon_sync_database_tinyint",
+ "mysql.converter.tinyint1-to-bool",
+ "false"),
+ ImmutableMap.of("bucket", "2"));
try (Connection conn = getMySqlConnection();
Statement statement = conn.createStatement()) {
@@ -506,4 +424,77 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
protected void cancelJob(String jobId) throws Exception {
jobManager.execInContainer("bin/flink", "cancel", jobId);
}
+
+ protected void runAction(
+ @Nonnull String action,
+ String partitionKeys,
+ String primaryKeys,
+ @Nonnull Map<String, String> computedColumn,
+ @Nonnull Map<String, String> mysqlConf,
+ @Nonnull Map<String, String> tableConf)
+ throws Exception {
+
+ String partitionKeysStr =
+ StringUtils.isBlank(partitionKeys) ? "" : "--partition-keys "
+ partitionKeys;
+ String primaryKeysStr =
+ StringUtils.isBlank(primaryKeys) ? "" : "--primary-keys " +
primaryKeys;
+ String tableStr = action.equals(ACTION_SYNC_TABLE) ? "--table
ts_table" : "";
+
+ List<String> computedColumns =
+ computedColumn.keySet().stream()
+ .map(key -> String.format("%s=%s", key,
computedColumn.get(key)))
+ .flatMap(s -> Stream.of("--computed-column", s))
+ .collect(Collectors.toList());
+
+ List<String> mysqlConfs =
+ mysqlConf.keySet().stream()
+ .map(key -> String.format("%s=%s", key,
mysqlConf.get(key)))
+ .flatMap(s -> Stream.of("--mysql-conf", s))
+ .collect(Collectors.toList());
+
+ List<String> tableConfs =
+ tableConf.keySet().stream()
+ .map(key -> String.format("%s=%s", key,
tableConf.get(key)))
+ .flatMap(s -> Stream.of("--table-conf", s))
+ .collect(Collectors.toList());
+
+ String runActionCommand =
+ String.join(
+ " ",
+ "bin/flink",
+ "run",
+ "-D",
+ "execution.checkpointing.interval=1s",
+ "--detached",
+ "lib/paimon-flink-action.jar",
+ action,
+ "--warehouse",
+ warehousePath,
+ "--database",
+ "default",
+ tableStr,
+ partitionKeysStr,
+ primaryKeysStr,
+ "--mysql-conf",
+ "hostname=mysql-1",
+ "--mysql-conf",
+ String.format("port=%d", MySqlContainer.MYSQL_PORT),
+ "--mysql-conf",
+ String.format("username='%s'",
mySqlContainer.getUsername()),
+ "--mysql-conf",
+ String.format("password='%s'",
mySqlContainer.getPassword()));
+
+ runActionCommand +=
+ " "
+ + String.join(" ", computedColumns)
+ + " "
+ + String.join(" ", mysqlConfs)
+ + " "
+ + String.join(" ", tableConfs);
+
+ Container.ExecResult execResult =
+ jobManager.execInContainer("su", "flink", "-c",
runActionCommand);
+ LOG.info(execResult.getStdout());
+ LOG.info(execResult.getStderr());
+ }
}
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
index 54b962a10..1f4dc3f27 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
@@ -18,14 +18,12 @@
package org.apache.paimon.tests.cdc;
-import org.apache.paimon.flink.action.cdc.mysql.MySqlContainer;
import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
import java.sql.Connection;
import java.sql.Statement;
@@ -33,54 +31,19 @@ import java.sql.Statement;
/** E2e test for MySql CDC with computed column. */
public class MySqlComputedColumnE2ETest extends MySqlCdcE2eTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(MySqlComputedColumnE2ETest.class);
-
protected MySqlComputedColumnE2ETest() {
super(MySqlVersion.V5_7);
}
@Test
public void testSyncTable() throws Exception {
- String runActionCommand =
- String.join(
- " ",
- "bin/flink",
- "run",
- "-D",
- "execution.checkpointing.interval=1s",
- "--detached",
- "lib/paimon-flink-action.jar",
- "mysql-sync-table",
- "--warehouse",
- warehousePath,
- "--database",
- "default",
- "--table",
- "ts_table",
- "--partition-keys",
- // computed from _datetime
- "_year",
- "--primary-keys",
- "pk,_year",
- "--computed-column '_year=year(_datetime)'",
- "--mysql-conf",
- "hostname=mysql-1",
- "--mysql-conf",
- String.format("port=%d", MySqlContainer.MYSQL_PORT),
- "--mysql-conf",
- String.format("username='%s'",
mySqlContainer.getUsername()),
- "--mysql-conf",
- String.format("password='%s'",
mySqlContainer.getPassword()),
- "--mysql-conf",
- "database-name='test_computed_column'",
- "--mysql-conf",
- "table-name='T'",
- "--table-conf",
- "bucket=2");
- Container.ExecResult execResult =
- jobManager.execInContainer("su", "flink", "-c",
runActionCommand);
- LOG.info(execResult.getStdout());
- LOG.info(execResult.getStderr());
+ runAction(
+ ACTION_SYNC_TABLE,
+ "_year",
+ "pk,_year",
+ ImmutableMap.of("_year", "'year(_datetime)'"),
+ ImmutableMap.of("database-name", "'test_computed_column'",
"table-name", "'T'"),
+ ImmutableMap.of("bucket", "2"));
try (Connection conn = getMySqlConnection();
Statement statement = conn.createStatement()) {
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
index 3ff120293..489de4c21 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
@@ -18,14 +18,12 @@
package org.apache.paimon.tests.cdc;
-import org.apache.paimon.flink.action.cdc.mysql.MySqlContainer;
import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
import java.sql.Connection;
import java.sql.Statement;
@@ -33,52 +31,25 @@ import java.sql.Statement;
/** E2e test for MySql CDC type convert tinyint(1) to tinyint. */
public class MySqlTinyIntConvertE2ETest extends MySqlCdcE2eTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(MySqlTinyIntConvertE2ETest.class);
-
protected MySqlTinyIntConvertE2ETest() {
super(MySqlVersion.V5_7);
}
@Test
public void testSyncTable() throws Exception {
- String runActionCommand =
- String.join(
- " ",
- "bin/flink",
- "run",
- "-D",
- "execution.checkpointing.interval=1s",
- "--detached",
- "lib/paimon-flink-action.jar",
- "mysql-sync-table",
- "--warehouse",
- warehousePath,
- "--database",
- "default",
- "--table",
- "tinyint_table",
- "--primary-keys",
- "pk",
- "--mysql-conf",
- "hostname=mysql-1",
- "--mysql-conf",
- String.format("port=%d", MySqlContainer.MYSQL_PORT),
- "--mysql-conf",
- String.format("username='%s'",
mySqlContainer.getUsername()),
- "--mysql-conf",
- String.format("password='%s'",
mySqlContainer.getPassword()),
- "--mysql-conf",
- "database-name='test_tinyint_convert'",
- "--mysql-conf",
- "table-name='T'",
- "--mysql-conf",
- "mysql.converter.tinyint1-to-bool='false'",
- "--table-conf",
- "bucket=2");
- Container.ExecResult execResult =
- jobManager.execInContainer("su", "flink", "-c",
runActionCommand);
- LOG.info(execResult.getStdout());
- LOG.info(execResult.getStderr());
+ runAction(
+ ACTION_SYNC_TABLE,
+ null,
+ "pk",
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ "database-name",
+ "test_tinyint_convert",
+ "table-name",
+ "'T'",
+ "mysql.converter.tinyint1-to-bool",
+ "false"),
+ ImmutableMap.of("bucket", "2"));
try (Connection conn = getMySqlConnection();
Statement statement = conn.createStatement()) {
@@ -88,7 +59,7 @@ public class MySqlTinyIntConvertE2ETest extends
MySqlCdcE2eTestBase {
String jobId =
runSql(
- "INSERT INTO result1 SELECT * FROM tinyint_table",
+ "INSERT INTO result1 SELECT * FROM ts_table",
catalogDdl,
useCatalogCmd,
createResultSink(