This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e58f3db5 [flink] MySqlSyncTableAction now supports flink run (#824)
1e58f3db5 is described below

commit 1e58f3db533b113ab5b26ed31f6042d0fe5ef84e
Author: tsreaper <[email protected]>
AuthorDate: Tue Apr 4 17:14:09 2023 +0800

    [flink] MySqlSyncTableAction now supports flink run (#824)
---
 .github/workflows/e2e-tests-1.14-jdk11.yml         |   2 +-
 .github/workflows/e2e-tests-1.14.yml               |   2 +-
 .github/workflows/e2e-tests-1.15-jdk11.yml         |   2 +-
 .github/workflows/e2e-tests-1.15.yml               |   2 +-
 docs/content/engines/hive.md                       |   2 +-
 paimon-e2e-tests/pom.xml                           |  62 +++++
 .../java/org/apache/paimon/tests/E2eTestBase.java  |  26 ++-
 .../apache/paimon/tests/cdc/MySql57E2eTest.java    |  32 +++
 .../apache/paimon/tests/cdc/MySql80E2eTest.java    |  32 +++
 .../paimon/tests/cdc/MySqlCdcE2eTestBase.java      | 230 +++++++++++++++++++
 .../test/resources-filtered/docker-compose.yaml    |   6 +-
 paimon-e2e-tests/src/test/resources/mysql/my.cnf   |  65 ++++++
 .../src/test/resources/mysql/setup.sql             |  42 ++++
 .../utils/SingleOutputStreamOperatorUtils.java     |  36 +++
 .../utils/SingleOutputStreamOperatorUtils.java     |  36 +++
 paimon-flink/paimon-flink-common/pom.xml           |  12 +
 .../org/apache/paimon/flink/action/Action.java     |   6 +
 .../action/cdc/mysql/MySqlSyncTableAction.java     | 253 ++++++++++++++++-----
 .../paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java |   4 +-
 .../utils/SingleOutputStreamOperatorUtils.java     |  36 +++
 20 files changed, 815 insertions(+), 73 deletions(-)

diff --git a/.github/workflows/e2e-tests-1.14-jdk11.yml 
b/.github/workflows/e2e-tests-1.14-jdk11.yml
index 8b30734f1..fcc1463c9 100644
--- a/.github/workflows/e2e-tests-1.14-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.14-jdk11.yml
@@ -45,7 +45,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.14
-        run: ./mvnw clean install -Dmaven.test.skip=true -Pflink-1.14
+        run: ./mvnw clean install -DskipTests -Pflink-1.14
       - name: Test Flink 1.14
         run: |
           # run tests with random timezone to find out timezone related bugs
diff --git a/.github/workflows/e2e-tests-1.14.yml 
b/.github/workflows/e2e-tests-1.14.yml
index 9eecd9da3..4fd19e1f8 100644
--- a/.github/workflows/e2e-tests-1.14.yml
+++ b/.github/workflows/e2e-tests-1.14.yml
@@ -41,7 +41,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.14
-        run: ./mvnw clean install -Dmaven.test.skip=true -Pflink-1.14
+        run: ./mvnw clean install -DskipTests -Pflink-1.14
       - name: Test Flink 1.14
         timeout-minutes: 60
         run: |
diff --git a/.github/workflows/e2e-tests-1.15-jdk11.yml 
b/.github/workflows/e2e-tests-1.15-jdk11.yml
index e46173118..48ae5a338 100644
--- a/.github/workflows/e2e-tests-1.15-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.15-jdk11.yml
@@ -45,7 +45,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.15
-        run: ./mvnw clean install -Dmaven.test.skip=true -Pflink-1.15
+        run: ./mvnw clean install -DskipTests -Pflink-1.15
       - name: Test Flink 1.15
         run: |
           # run tests with random timezone to find out timezone related bugs
diff --git a/.github/workflows/e2e-tests-1.15.yml 
b/.github/workflows/e2e-tests-1.15.yml
index 371922a4d..df211a942 100644
--- a/.github/workflows/e2e-tests-1.15.yml
+++ b/.github/workflows/e2e-tests-1.15.yml
@@ -41,7 +41,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.15
-        run: ./mvnw clean install -Dmaven.test.skip=true -Pflink-1.15
+        run: ./mvnw clean install -DskipTests -Pflink-1.15
       - name: Test Flink 1.15
         timeout-minutes: 60
         run: |
diff --git a/docs/content/engines/hive.md b/docs/content/engines/hive.md
index cb4db04fa..c844a3f5e 100644
--- a/docs/content/engines/hive.md
+++ b/docs/content/engines/hive.md
@@ -69,7 +69,7 @@ You can also manually build bundled jar from the source code.
 To build from source code, [clone the git repository]({{< github_repo >}}).
 
 Build bundled jar with the following command.
-`mvn clean install -Dmaven.test.skip=true`
+`mvn clean install -DskipTests`
 
 You can find Hive connector jar in 
`./paimon-hive/paimon-hive-connector-<hive-version>/target/paimon-hive-connector-<hive-version>-{{<
 version >}}.jar`.
 
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index 943f3da05..af3c2706d 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -33,6 +33,7 @@ under the License.
 
     <properties>
         <flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
+        <flink.cdc.version>2.3.0</flink.cdc.version>
         
<flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
         
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
     </properties>
@@ -61,6 +62,21 @@ under the License.
             <scope>runtime</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-connector-mysql-cdc</artifactId>
+            <version>${flink.cdc.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <!-- testcontainers -->
 
         <dependency>
@@ -76,6 +92,13 @@ under the License.
             <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -134,6 +157,16 @@ under the License.
                             <outputDirectory>/tmp/paimon-e2e-tests-jars
                             </outputDirectory>
                         </artifactItem>
+                        <artifactItem>
+                            <groupId>com.ververica</groupId>
+                            
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
+                            <version>${flink.cdc.version}</version>
+                            <destFileName>mysql-cdc.jar</destFileName>
+                            <type>jar</type>
+                            <overWrite>true</overWrite>
+                            <outputDirectory>/tmp/paimon-e2e-tests-jars
+                            </outputDirectory>
+                        </artifactItem>
                         <!-- test paimon with kafka sql jar -->
                         <artifactItem>
                             <groupId>org.apache.flink</groupId>
@@ -159,7 +192,36 @@ under the License.
                     </artifactItems>
                 </configuration>
             </plugin>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>3.1.0</version>
+                <executions>
+                    <execution>
+                        <id>FLINK-31695</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>exec</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <!--
+                    Due to FLINK-31695, we need to delete conflicting classes 
from jar file.
+                    Remove this work-around after we make our own e2e docker 
image,
+                    with both Flink and Hadoop in the same docker.
+                    -->
+                    <executable>zip</executable>
+                    <arguments>
+                        <argument>-d</argument>
+                        
<argument>/tmp/paimon-e2e-tests-jars/bundled-hadoop.jar</argument>
+                        
<argument>org/apache/commons/cli/CommandLine.class</argument>
+                    </arguments>
+                </configuration>
+            </plugin>
         </plugins>
+
         <resources>
             <resource>
                 <directory>src/test/resources</directory>
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
index f716e25c5..a3c66ef6c 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.ContainerState;
 import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.OutputFrame;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.strategy.Wait;
@@ -39,6 +40,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -77,6 +80,7 @@ public abstract class E2eTestBase {
     private static final int CHECK_RESULT_RETRIES = 60;
     private final List<String> currentResults = new ArrayList<>();
 
+    protected Network network;
     protected DockerComposeContainer<?> environment;
     protected ContainerState jobManager;
 
@@ -85,6 +89,8 @@ public abstract class E2eTestBase {
         List<String> services = new ArrayList<>();
         services.add("jobmanager");
         services.add("taskmanager");
+
+        network = Network.newNetwork();
         environment =
                 new DockerComposeContainer<>(
                                 new File(
@@ -92,8 +98,10 @@ public abstract class E2eTestBase {
                                                 .getClassLoader()
                                                 
.getResource("docker-compose.yaml")
                                                 .toURI()))
+                        .withEnv("NETWORK_ID", network.getId())
                         .withLogConsumer("jobmanager_1", new LogConsumer(LOG))
-                        .withLogConsumer("taskmanager_1", new 
LogConsumer(LOG));
+                        .withLogConsumer("taskmanager_1", new LogConsumer(LOG))
+                        .withLocalCompose(true);
         if (withKafka) {
             List<String> kafkaServices = Arrays.asList("zookeeper", "kafka");
             services.addAll(kafkaServices);
@@ -142,6 +150,9 @@ public abstract class E2eTestBase {
         if (environment != null) {
             environment.stop();
         }
+        if (network != null) {
+            network.close();
+        }
     }
 
     protected void writeSharedFile(String filename, String content) throws 
Exception {
@@ -200,7 +211,11 @@ public abstract class E2eTestBase {
                         topicName, tmpDir, filename));
     }
 
-    protected void runSql(String sql) throws Exception {
+    private static final Pattern JOB_ID_PATTERN =
+            Pattern.compile(
+                    "SQL update statement has been successfully submitted to 
the cluster:\\s+Job ID: (\\S+)");
+
+    protected String runSql(String sql) throws Exception {
         String fileName = UUID.randomUUID() + ".sql";
         writeSharedFile(fileName, sql);
         Container.ExecResult execResult =
@@ -214,6 +229,13 @@ public abstract class E2eTestBase {
         if (execResult.getExitCode() != 0) {
             throw new AssertionError("Failed when submitting the SQL job.");
         }
+
+        Matcher matcher = JOB_ID_PATTERN.matcher(execResult.getStdout());
+        if (matcher.find()) {
+            return matcher.group(1);
+        } else {
+            return null;
+        }
     }
 
     protected String createResultSink(String sinkName, String schema) {
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
new file mode 100644
index 000000000..732f1eee2
--- /dev/null
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tests.cdc;
+
+import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+
+/**
+ * E2e tests for {@link 
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
+ * 5.7.
+ */
+public class MySql57E2eTest extends MySqlCdcE2eTestBase {
+
+    public MySql57E2eTest() {
+        super(MySqlVersion.V5_7);
+    }
+}
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
new file mode 100644
index 000000000..c0ecd90d7
--- /dev/null
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tests.cdc;
+
+import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+
+/**
+ * E2e tests for {@link 
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
+ * 8.0.
+ */
+public class MySql80E2eTest extends MySqlCdcE2eTestBase {
+
+    public MySql80E2eTest() {
+        super(MySqlVersion.V8_0);
+    }
+}
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
new file mode 100644
index 000000000..a0bede8a8
--- /dev/null
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tests.cdc;
+
+import org.apache.paimon.flink.action.cdc.mysql.MySqlContainer;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+import org.apache.paimon.tests.E2eTestBase;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+/** E2e tests for {@link 
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction}. */
+public abstract class MySqlCdcE2eTestBase extends E2eTestBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlCdcE2eTestBase.class);
+
+    private static final String USER = "paimonuser";
+    private static final String PASSWORD = "paimonpw";
+    private static final String DATABASE_NAME = "paimon_test";
+
+    private final MySqlVersion mySqlVersion;
+    private MySqlContainer mySqlContainer;
+
+    private String warehousePath;
+    private String catalogDdl;
+    private String useCatalogCmd;
+
+    protected MySqlCdcE2eTestBase(MySqlVersion mySqlVersion) {
+        this.mySqlVersion = mySqlVersion;
+    }
+
+    @BeforeEach
+    public void before() throws Exception {
+        super.before();
+        mySqlContainer = createMySqlContainer(mySqlVersion);
+        Startables.deepStart(Stream.of(mySqlContainer)).join();
+
+        warehousePath = TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store";
+        catalogDdl =
+                String.format(
+                        "CREATE CATALOG ts_catalog WITH (\n"
+                                + "    'type' = 'paimon',\n"
+                                + "    'warehouse' = '%s'\n"
+                                + ");",
+                        warehousePath);
+
+        useCatalogCmd = "USE CATALOG ts_catalog;";
+    }
+
+    private MySqlContainer createMySqlContainer(MySqlVersion version) {
+        return (MySqlContainer)
+                new MySqlContainer(version)
+                        .withConfigurationOverride("mysql/my.cnf")
+                        .withSetupSQL("mysql/setup.sql")
+                        .withUsername(USER)
+                        .withPassword(PASSWORD)
+                        .withDatabaseName(DATABASE_NAME)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG))
+                        // connect with docker-compose.yaml
+                        .withNetwork(network)
+                        .withNetworkAliases("mysql-1");
+    }
+
+    @AfterEach
+    public void after() {
+        mySqlContainer.stop();
+        super.after();
+    }
+
+    @Test
+    public void testSyncTable() throws Exception {
+        String runActionCommand =
+                String.join(
+                        " ",
+                        "bin/flink",
+                        "run",
+                        "-c",
+                        "org.apache.paimon.flink.action.FlinkActions",
+                        "-D",
+                        "execution.checkpointing.interval=1s",
+                        "--detached",
+                        "lib/paimon-flink.jar",
+                        "mysql-sync-table",
+                        "--warehouse",
+                        warehousePath,
+                        "--database",
+                        "default",
+                        "--table",
+                        "ts_table",
+                        "--partition-keys",
+                        "pt",
+                        "--primary-keys",
+                        "pt,_id",
+                        "--sink-parallelism",
+                        "2",
+                        "--mysql-conf",
+                        "hostname=mysql-1",
+                        "--mysql-conf",
+                        String.format("port=%d", MySqlContainer.MYSQL_PORT),
+                        "--mysql-conf",
+                        String.format("username='%s'", 
mySqlContainer.getUsername()),
+                        "--mysql-conf",
+                        String.format("password='%s'", 
mySqlContainer.getPassword()),
+                        "--mysql-conf",
+                        String.format("database-name='%s'", DATABASE_NAME),
+                        "--mysql-conf",
+                        "table-name='schema_evolution_.+'",
+                        "--paimon-conf",
+                        "bucket=2");
+        Container.ExecResult execResult =
+                jobManager.execInContainer("su", "flink", "-c", 
runActionCommand);
+        LOG.info(execResult.getStdout());
+        LOG.info(execResult.getStderr());
+
+        try (Connection conn =
+                DriverManager.getConnection(
+                        String.format(
+                                "jdbc:mysql://%s:%s/",
+                                mySqlContainer.getHost(), 
mySqlContainer.getDatabasePort()),
+                        mySqlContainer.getUsername(),
+                        mySqlContainer.getPassword())) {
+            try (Statement statement = conn.createStatement()) {
+                testSyncTableImpl(statement);
+            }
+        }
+    }
+
+    private void testSyncTableImpl(Statement statement) throws Exception {
+        statement.executeUpdate("USE paimon_test");
+
+        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 1, 
'one')");
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_2 VALUES (1, 2, 'two'), (2, 4, 
'four')");
+
+        String jobId =
+                runSql(
+                        "INSERT INTO result1 SELECT * FROM ts_table;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink("result1", "pt INT, _id INT, v1 
VARCHAR(10)"));
+        checkResult("1, 1, one", "1, 2, two", "2, 4, four");
+        clearCurrentResults();
+        Container.ExecResult execResult = 
jobManager.execInContainer("bin/flink", "cancel", jobId);
+        LOG.info(execResult.getStdout());
+        LOG.info(execResult.getStderr());
+
+        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v2 
INT");
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_1 VALUES (2, 3, 'three', 30), 
(1, 5, 'five', 50)");
+        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v2 
INT");
+        statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (1, 6, 
'six', 60)");
+
+        jobId =
+                runSql(
+                        "INSERT INTO result2 SELECT * FROM ts_table;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink("result2", "pt INT, _id INT, v1 
VARCHAR(10), v2 INT"));
+        checkResult(
+                "1, 1, one, null",
+                "1, 2, two, null",
+                "2, 3, three, 30",
+                "2, 4, four, null",
+                "1, 5, five, 50",
+                "1, 6, six, 60");
+        clearCurrentResults();
+        jobManager.execInContainer("bin/flink", "cancel", jobId);
+
+        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN 
v2 BIGINT");
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_1 VALUES (2, 7, 'seven', 
70000000000)");
+        statement.executeUpdate("UPDATE schema_evolution_1 SET v2 = 
30000000000 WHERE _id = 3");
+        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN 
v2 BIGINT");
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_2 VALUES (2, 8, 'eight', 
80000000000)");
+
+        jobId =
+                runSql(
+                        "INSERT INTO result3 SELECT * FROM ts_table;",
+                        catalogDdl,
+                        useCatalogCmd,
+                        "",
+                        createResultSink("result3", "pt INT, _id INT, v1 
VARCHAR(10), v2 BIGINT"));
+        checkResult(
+                "1, 1, one, null",
+                "1, 2, two, null",
+                "2, 3, three, 30000000000",
+                "2, 4, four, null",
+                "1, 5, five, 50",
+                "1, 6, six, 60",
+                "2, 7, seven, 70000000000",
+                "2, 8, eight, 80000000000");
+        clearCurrentResults();
+        jobManager.execInContainer("bin/flink", "cancel", jobId);
+    }
+
+    private String runSql(String sql, String... ddls) throws Exception {
+        return runSql(String.join("\n", ddls) + "\n" + sql);
+    }
+}
diff --git a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml 
b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
index 88d9c6857..a24999e47 100644
--- a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
+++ b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
@@ -31,7 +31,7 @@ services:
       - /tmp/paimon-e2e-tests-jars:/jars
     entrypoint: >
       /bin/bash -c "
-      cp /jars/paimon-flink.jar /jars/bundled-hadoop.jar
+      cp /jars/paimon-flink.jar /jars/bundled-hadoop.jar /jars/mysql-cdc.jar
       /jars/flink-sql-connector-kafka.jar /jars/flink-sql-connector-hive.jar 
/opt/flink/lib ;
       echo 'See FLINK-31659 for why we need the following two steps' ;
       mv /opt/flink/opt/flink-table-planner*.jar /opt/flink/lib/ ;
@@ -54,7 +54,7 @@ services:
       - /tmp/paimon-e2e-tests-jars:/jars
     entrypoint: >
       /bin/bash -c "
-      cp /jars/paimon-flink.jar /jars/bundled-hadoop.jar
+      cp /jars/paimon-flink.jar /jars/bundled-hadoop.jar /jars/mysql-cdc.jar
       /jars/flink-sql-connector-kafka.jar /jars/flink-sql-connector-hive.jar 
/opt/flink/lib ;
       echo 'See FLINK-31659 for why we need the following two steps' ;
       mv /opt/flink/opt/flink-table-planner*.jar /opt/flink/lib/ ;
@@ -222,3 +222,5 @@ volumes:
 
 networks:
   testnetwork:
+    name: ${NETWORK_ID}
+    external: true
diff --git a/paimon-e2e-tests/src/test/resources/mysql/my.cnf 
b/paimon-e2e-tests/src/test/resources/mysql/my.cnf
new file mode 100644
index 000000000..9c9a8747b
--- /dev/null
+++ b/paimon-e2e-tests/src/test/resources/mysql/my.cnf
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but 
would
+# be longer on a production system. Row-level info is required for ingest to 
work.
+# Server ID is required, but this will vary on production systems
+server-id         = 223344
+log_bin           = mysql-bin
+expire_logs_days  = 1
+binlog_format     = row
+
+# If this option is off
+# MySQL will set current timestamp to any timestamp field with NULL value
+# which is bad for our tests
+explicit_defaults_for_timestamp = ON
diff --git a/paimon-e2e-tests/src/test/resources/mysql/setup.sql 
b/paimon-e2e-tests/src/test/resources/mysql/setup.sql
new file mode 100644
index 000000000..1477afa49
--- /dev/null
+++ b/paimon-e2e-tests/src/test/resources/mysql/setup.sql
@@ -0,0 +1,42 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- In production you would almost certainly limit the replication user must be 
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For 
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'paimonuser' - all privileges required by the snapshot reader AND binlog 
reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, 
LOCK TABLES  ON *.* TO 'paimonuser'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+
+USE paimon_test;
+
+CREATE TABLE schema_evolution_1 (
+    pt INT,
+    _id INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (_id)
+);
+
+CREATE TABLE schema_evolution_2 (
+    pt INT,
+    _id INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (_id)
+);
diff --git 
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
 
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
new file mode 100644
index 000000000..c7bf42a64
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * Utility methods for {@link SingleOutputStreamOperator}.
+ *
+ * <p>TODO: merge all Flink version related utils into one class.
+ */
+public class SingleOutputStreamOperatorUtils {
+
+    public static <T> DataStream<T> getSideOutput(
+            SingleOutputStreamOperator<?> input, OutputTag<T> outputTag) {
+        return input.getSideOutput(outputTag);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
new file mode 100644
index 000000000..c7bf42a64
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * Utility methods for {@link SingleOutputStreamOperator}.
+ *
+ * <p>TODO: merge all Flink version related utils into one class.
+ */
+public class SingleOutputStreamOperatorUtils {
+
+    public static <T> DataStream<T> getSideOutput(
+            SingleOutputStreamOperator<?> input, OutputTag<T> outputTag) {
+        return input.getSideOutput(outputTag);
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index 2dad11255..69c5ba799 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -294,6 +294,18 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
index 05ffe854e..8ef2d391e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.paimon.catalog.CatalogUtils;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction;
 
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
@@ -116,6 +117,8 @@ public interface Action {
         private static final String DROP_PARTITION = "drop-partition";
         private static final String DELETE = "delete";
         private static final String MERGE_INTO = "merge-into";
+        // cdc actions
+        private static final String MYSQL_SYNC_TABLE = "mysql-sync-table";
 
         public static Optional<Action> create(String[] args) {
             String action = args[0].toLowerCase();
@@ -130,6 +133,8 @@ public interface Action {
                     return DeleteAction.create(actionArgs);
                 case MERGE_INTO:
                     return MergeIntoAction.create(actionArgs);
+                case MYSQL_SYNC_TABLE:
+                    return MySqlSyncTableAction.create(actionArgs);
                 default:
                     System.err.println("Unknown action \"" + action + "\"");
                     printHelp();
@@ -146,6 +151,7 @@ public interface Action {
             System.out.println("  " + DROP_PARTITION);
             System.out.println("  " + DELETE);
             System.out.println("  " + MERGE_INTO);
+            System.out.println("  " + MYSQL_SYNC_TABLE);
 
             System.out.println("For detailed options of each action, run 
<action> --help");
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 9a7710840..ee6b51fb1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -44,6 +44,9 @@ import 
com.ververica.cdc.connectors.mysql.table.StartupOptions;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 
@@ -51,9 +54,10 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
-import java.time.Duration;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -62,6 +66,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * An {@link Action} which synchronize one or multiple MySQL tables into one 
Paimon table.
@@ -95,7 +100,7 @@ import java.util.regex.Pattern;
  */
 public class MySqlSyncTableAction implements Action {
 
-    private final Map<String, String> mySqlConfig;
+    private final Configuration mySqlConfig;
     private final String warehouse;
     private final String database;
     private final String table;
@@ -111,7 +116,7 @@ public class MySqlSyncTableAction implements Action {
             List<String> partitionKeys,
             List<String> primaryKeys,
             Map<String, String> paimonConfig) {
-        this.mySqlConfig = mySqlConfig;
+        this.mySqlConfig = Configuration.fromMap(mySqlConfig);
         this.warehouse = warehouse;
         this.database = database;
         this.table = table;
@@ -120,13 +125,6 @@ public class MySqlSyncTableAction implements Action {
         this.paimonConfig = paimonConfig;
     }
 
-    @Override
-    public void run() throws Exception {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        build(env);
-        env.execute(String.format("MySQL CTAS: %s.%s", database, table));
-    }
-
     public void build(StreamExecutionEnvironment env) throws Exception {
         MySqlSource<String> source = buildSource();
         MySqlSchema mySqlSchema =
@@ -162,7 +160,7 @@ public class MySqlSyncTableAction implements Action {
         }
 
         EventParser.Factory<String> parserFactory;
-        String serverTimeZone = mySqlConfig.get("server-time-zone");
+        String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         if (serverTimeZone != null) {
             parserFactory = () -> new 
MySqlDebeziumJsonEventParser(ZoneId.of(serverTimeZone));
         } else {
@@ -186,32 +184,37 @@ public class MySqlSyncTableAction implements Action {
     private MySqlSource<String> buildSource() {
         MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
 
-        String databaseName = 
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME.key());
-        String tableName = 
mySqlConfig.get(MySqlSourceOptions.TABLE_NAME.key());
+        String databaseName = 
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
+        String tableName = mySqlConfig.get(MySqlSourceOptions.TABLE_NAME);
         sourceBuilder
-                .hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME.key()))
-                
.port(Integer.parseInt(mySqlConfig.get(MySqlSourceOptions.PORT.key())))
-                .username(mySqlConfig.get(MySqlSourceOptions.USERNAME.key()))
-                .password(mySqlConfig.get(MySqlSourceOptions.PASSWORD.key()))
+                .hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME))
+                .port(mySqlConfig.get(MySqlSourceOptions.PORT))
+                .username(mySqlConfig.get(MySqlSourceOptions.USERNAME))
+                .password(mySqlConfig.get(MySqlSourceOptions.PASSWORD))
                 .databaseList(databaseName)
                 .tableList(databaseName + "." + tableName);
 
-        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.SERVER_ID.key()))
-                .ifPresent(sourceBuilder::serverId);
-        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE.key()))
+        
mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.SERVER_TIME_ZONE)
                 .ifPresent(sourceBuilder::serverTimeZone);
-        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.key()))
-                .ifPresent(size -> 
sourceBuilder.fetchSize(Integer.parseInt(size)));
-        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.CONNECT_TIMEOUT.key()))
-                .ifPresent(timeout -> 
sourceBuilder.connectTimeout(Duration.parse(timeout)));
-        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.CONNECT_MAX_RETRIES.key()))
-                .ifPresent(retries -> 
sourceBuilder.connectMaxRetries(Integer.parseInt(retries)));
-        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.CONNECTION_POOL_SIZE.key()))
-                .ifPresent(size -> 
sourceBuilder.connectionPoolSize(Integer.parseInt(size)));
-        
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.HEARTBEAT_INTERVAL.key()))
-                .ifPresent(interval -> 
sourceBuilder.heartbeatInterval(Duration.parse(interval)));
-
-        String startupMode = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE.key());
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE)
+                .ifPresent(sourceBuilder::fetchSize);
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.CONNECT_TIMEOUT)
+                .ifPresent(sourceBuilder::connectTimeout);
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.CONNECT_MAX_RETRIES)
+                .ifPresent(sourceBuilder::connectMaxRetries);
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.CONNECTION_POOL_SIZE)
+                .ifPresent(sourceBuilder::connectionPoolSize);
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.HEARTBEAT_INTERVAL)
+                .ifPresent(sourceBuilder::heartbeatInterval);
+
+        String startupMode = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
         // see
         // 
https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L196
         if ("initial".equalsIgnoreCase(startupMode)) {
@@ -222,40 +225,30 @@ public class MySqlSyncTableAction implements Action {
             sourceBuilder.startupOptions(StartupOptions.latest());
         } else if ("specific-offset".equalsIgnoreCase(startupMode)) {
             BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
-            String file =
-                    
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE.key());
-            String pos = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS.key());
+            String file = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+            Long pos = 
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
             if (file != null && pos != null) {
-                offsetBuilder.setBinlogFilePosition(file, Long.parseLong(pos));
+                offsetBuilder.setBinlogFilePosition(file, pos);
             }
-            Optional.ofNullable(
-                            mySqlConfig.get(
-                                    
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET.key()))
+            mySqlConfig
+                    
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
                     .ifPresent(offsetBuilder::setGtidSet);
-            Optional.ofNullable(
-                            mySqlConfig.get(
-                                    
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS
-                                            .key()))
-                    .ifPresent(
-                            skipEvents -> 
offsetBuilder.setSkipEvents(Long.parseLong(skipEvents)));
-            Optional.ofNullable(
-                            mySqlConfig.get(
-                                    
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS
-                                            .key()))
-                    .ifPresent(skipRows -> 
offsetBuilder.setSkipRows(Long.parseLong(skipRows)));
+            mySqlConfig
+                    
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
+                    .ifPresent(offsetBuilder::setSkipEvents);
+            mySqlConfig
+                    
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
+                    .ifPresent(offsetBuilder::setSkipRows);
             
sourceBuilder.startupOptions(StartupOptions.specificOffset(offsetBuilder.build()));
         } else if ("timestamp".equalsIgnoreCase(startupMode)) {
             sourceBuilder.startupOptions(
                     StartupOptions.timestamp(
-                            Long.parseLong(
-                                    mySqlConfig.get(
-                                            
MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS
-                                                    .key()))));
+                            
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
         }
 
         Properties jdbcProperties = new Properties();
         Properties debeziumProperties = new Properties();
-        for (Map.Entry<String, String> entry : mySqlConfig.entrySet()) {
+        for (Map.Entry<String, String> entry : mySqlConfig.toMap().entrySet()) 
{
             String key = entry.getKey();
             String value = entry.getValue();
             if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
@@ -277,18 +270,17 @@ public class MySqlSyncTableAction implements Action {
 
     private List<MySqlSchema> getMySqlSchemaList() throws Exception {
         Pattern databasePattern =
-                
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME.key()));
-        Pattern tablePattern =
-                
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME.key()));
+                
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
+        Pattern tablePattern = 
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
         List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
         try (Connection conn =
                 DriverManager.getConnection(
                         String.format(
-                                "jdbc:mysql://%s:%s/",
-                                
mySqlConfig.get(MySqlSourceOptions.HOSTNAME.key()),
-                                
mySqlConfig.get(MySqlSourceOptions.PORT.key())),
-                        mySqlConfig.get(MySqlSourceOptions.USERNAME.key()),
-                        mySqlConfig.get(MySqlSourceOptions.PASSWORD.key()))) {
+                                "jdbc:mysql://%s:%d/",
+                                mySqlConfig.get(MySqlSourceOptions.HOSTNAME),
+                                mySqlConfig.get(MySqlSourceOptions.PORT)),
+                        mySqlConfig.get(MySqlSourceOptions.USERNAME),
+                        mySqlConfig.get(MySqlSourceOptions.PASSWORD))) {
             DatabaseMetaData metaData = conn.getMetaData();
             try (ResultSet schemas = metaData.getCatalogs()) {
                 while (schemas.next()) {
@@ -427,4 +419,139 @@ public class MySqlSyncTableAction implements Action {
             return this;
         }
     }
+
+    // ------------------------------------------------------------------------
+    //  Flink run methods
+    // ------------------------------------------------------------------------
+
+    public static Optional<Action> create(String[] args) {
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+        if (params.has("help")) {
+            printHelp();
+            return Optional.empty();
+        }
+
+        Tuple3<String, String, String> tablePath = Action.getTablePath(params);
+        if (tablePath == null) {
+            return Optional.empty();
+        }
+
+        List<String> partitionKeys = Collections.emptyList();
+        if (params.has("partition-keys")) {
+            partitionKeys =
+                    Arrays.stream(params.get("partition-keys").split(","))
+                            .collect(Collectors.toList());
+        }
+
+        List<String> primaryKeys = Collections.emptyList();
+        if (params.has("primary-keys")) {
+            primaryKeys =
+                    Arrays.stream(params.get("primary-keys").split(","))
+                            .collect(Collectors.toList());
+        }
+
+        Map<String, String> mySqlConfig = getConfigMap(params, "mysql-conf");
+        Map<String, String> paimonConfig = getConfigMap(params, "paimon-conf");
+        if (mySqlConfig == null || paimonConfig == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(
+                new MySqlSyncTableAction(
+                        mySqlConfig,
+                        tablePath.f0,
+                        tablePath.f1,
+                        tablePath.f2,
+                        partitionKeys,
+                        primaryKeys,
+                        paimonConfig));
+    }
+
+    private static Map<String, String> getConfigMap(MultipleParameterTool 
params, String key) {
+        Map<String, String> map = new HashMap<>();
+
+        for (String param : params.getMultiParameter(key)) {
+            String[] kv = param.split("=");
+            if (kv.length == 2) {
+                map.put(kv[0], kv[1]);
+                continue;
+            }
+
+            System.err.println(
+                    "Invalid " + key + " " + param + ".\nRun mysql-sync-table 
--help for help.");
+            return null;
+        }
+        return map;
+    }
+
+    private static void printHelp() {
+        System.out.println(
+                "Action \"mysql-sync-table\" creates a streaming job "
+                        + "with a Flink MySQL CDC source and a Paimon table 
sink to consume CDC events.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  mysql-sync-table --warehouse <warehouse-path> --database 
<database-name> "
+                        + "--table <table-name> "
+                        + "[--partition-keys <partition-keys>] "
+                        + "[--primary-keys <primary-keys>] "
+                        + "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...]] "
+                        + "[--paimon-conf <paimon-table-sink-conf> 
[--paimon-conf <paimon-table-sink-conf> ...]]");
+        System.out.println();
+
+        System.out.println("Partition keys syntax:");
+        System.out.println("  key1,key2,...");
+        System.out.println(
+                "If partition key is not defined and the specified Paimon 
table does not exist, "
+                        + "this action will automatically create an 
unpartitioned Paimon table.");
+        System.out.println();
+
+        System.out.println("Primary keys syntax:");
+        System.out.println("  key1,key2,...");
+        System.out.println("Primary keys will be derived from MySQL tables if 
not specified.");
+        System.out.println();
+
+        System.out.println("MySQL CDC source conf syntax:");
+        System.out.println("  key=value");
+        System.out.println(
+                "'hostname', 'username', 'password', 'database-name' and 
'table-name' "
+                        + "are required configurations, others are optional.");
+        System.out.println(
+                "For a complete list of supported configurations, "
+                        + "see 
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";);
+        System.out.println();
+
+        System.out.println("Paimon table sink conf syntax:");
+        System.out.println("  key=value");
+        System.out.println(
+                "For a complete list of supported configurations, "
+                        + "see 
https://paimon.apache.org/docs/master/maintenance/configurations/";);
+        System.out.println();
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  mysql-sync-table \\"
+                        + "    --warehouse hdfs:///path/to/warehouse \\"
+                        + "    --database test_db \\"
+                        + "    --table test_table \\"
+                        + "    --partition-keys pt \\"
+                        + "    --primary-keys pt,uid \\"
+                        + "    --mysql-conf hostname=127.0.0.1 \\"
+                        + "    --mysql-conf username=root \\"
+                        + "    --mysql-conf password=123456 \\"
+                        + "    --mysql-conf database-name=source_db \\"
+                        + "    --mysql-conf table-name='source_table_.*' \\"
+                        + "    --paimon-conf bucket=4 \\"
+                        + "    --paimon-conf changelog-producer=input \\"
+                        + "    --paimon-conf sink.parallelism=4 \\");
+    }
+
+    @Override
+    public void run() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        build(env);
+        env.execute(String.format("MySQL-Paimon Table Sync: %s.%s", database, 
table));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
index acee7b911..bd9710070 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.sink.LogSinkFunction;
+import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
@@ -89,7 +90,8 @@ public class FlinkCdcSinkBuilder<T> {
                         .setParallelism(input.getParallelism());
 
         DataStream<Void> schemaChangeProcessFunction =
-                
parsed.getSideOutput(CdcParsingProcessFunction.SCHEMA_CHANGE_OUTPUT_TAG)
+                SingleOutputStreamOperatorUtils.getSideOutput(
+                                parsed, 
CdcParsingProcessFunction.SCHEMA_CHANGE_OUTPUT_TAG)
                         .process(
                                 new SchemaChangeProcessFunction(
                                         new SchemaManager(table.fileIO(), 
table.location())));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
new file mode 100644
index 000000000..c7bf42a64
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * Utility methods for {@link SingleOutputStreamOperator}.
+ *
+ * <p>TODO: merge all Flink version related utils into one class.
+ */
+public class SingleOutputStreamOperatorUtils {
+
+    public static <T> DataStream<T> getSideOutput(
+            SingleOutputStreamOperator<?> input, OutputTag<T> outputTag) {
+        return input.getSideOutput(outputTag);
+    }
+}

Reply via email to