This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1e58f3db5 [flink] MySqlSyncTableAction now supports flink run (#824)
1e58f3db5 is described below
commit 1e58f3db533b113ab5b26ed31f6042d0fe5ef84e
Author: tsreaper <[email protected]>
AuthorDate: Tue Apr 4 17:14:09 2023 +0800
[flink] MySqlSyncTableAction now supports flink run (#824)
---
.github/workflows/e2e-tests-1.14-jdk11.yml | 2 +-
.github/workflows/e2e-tests-1.14.yml | 2 +-
.github/workflows/e2e-tests-1.15-jdk11.yml | 2 +-
.github/workflows/e2e-tests-1.15.yml | 2 +-
docs/content/engines/hive.md | 2 +-
paimon-e2e-tests/pom.xml | 62 +++++
.../java/org/apache/paimon/tests/E2eTestBase.java | 26 ++-
.../apache/paimon/tests/cdc/MySql57E2eTest.java | 32 +++
.../apache/paimon/tests/cdc/MySql80E2eTest.java | 32 +++
.../paimon/tests/cdc/MySqlCdcE2eTestBase.java | 230 +++++++++++++++++++
.../test/resources-filtered/docker-compose.yaml | 6 +-
paimon-e2e-tests/src/test/resources/mysql/my.cnf | 65 ++++++
.../src/test/resources/mysql/setup.sql | 42 ++++
.../utils/SingleOutputStreamOperatorUtils.java | 36 +++
.../utils/SingleOutputStreamOperatorUtils.java | 36 +++
paimon-flink/paimon-flink-common/pom.xml | 12 +
.../org/apache/paimon/flink/action/Action.java | 6 +
.../action/cdc/mysql/MySqlSyncTableAction.java | 253 ++++++++++++++++-----
.../paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java | 4 +-
.../utils/SingleOutputStreamOperatorUtils.java | 36 +++
20 files changed, 815 insertions(+), 73 deletions(-)
diff --git a/.github/workflows/e2e-tests-1.14-jdk11.yml
b/.github/workflows/e2e-tests-1.14-jdk11.yml
index 8b30734f1..fcc1463c9 100644
--- a/.github/workflows/e2e-tests-1.14-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.14-jdk11.yml
@@ -45,7 +45,7 @@ jobs:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build Flink 1.14
- run: ./mvnw clean install -Dmaven.test.skip=true -Pflink-1.14
+ run: ./mvnw clean install -DskipTests -Pflink-1.14
- name: Test Flink 1.14
run: |
# run tests with random timezone to find out timezone related bugs
diff --git a/.github/workflows/e2e-tests-1.14.yml
b/.github/workflows/e2e-tests-1.14.yml
index 9eecd9da3..4fd19e1f8 100644
--- a/.github/workflows/e2e-tests-1.14.yml
+++ b/.github/workflows/e2e-tests-1.14.yml
@@ -41,7 +41,7 @@ jobs:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build Flink 1.14
- run: ./mvnw clean install -Dmaven.test.skip=true -Pflink-1.14
+ run: ./mvnw clean install -DskipTests -Pflink-1.14
- name: Test Flink 1.14
timeout-minutes: 60
run: |
diff --git a/.github/workflows/e2e-tests-1.15-jdk11.yml
b/.github/workflows/e2e-tests-1.15-jdk11.yml
index e46173118..48ae5a338 100644
--- a/.github/workflows/e2e-tests-1.15-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.15-jdk11.yml
@@ -45,7 +45,7 @@ jobs:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build Flink 1.15
- run: ./mvnw clean install -Dmaven.test.skip=true -Pflink-1.15
+ run: ./mvnw clean install -DskipTests -Pflink-1.15
- name: Test Flink 1.15
run: |
# run tests with random timezone to find out timezone related bugs
diff --git a/.github/workflows/e2e-tests-1.15.yml
b/.github/workflows/e2e-tests-1.15.yml
index 371922a4d..df211a942 100644
--- a/.github/workflows/e2e-tests-1.15.yml
+++ b/.github/workflows/e2e-tests-1.15.yml
@@ -41,7 +41,7 @@ jobs:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build Flink 1.15
- run: ./mvnw clean install -Dmaven.test.skip=true -Pflink-1.15
+ run: ./mvnw clean install -DskipTests -Pflink-1.15
- name: Test Flink 1.15
timeout-minutes: 60
run: |
diff --git a/docs/content/engines/hive.md b/docs/content/engines/hive.md
index cb4db04fa..c844a3f5e 100644
--- a/docs/content/engines/hive.md
+++ b/docs/content/engines/hive.md
@@ -69,7 +69,7 @@ You can also manually build bundled jar from the source code.
To build from source code, [clone the git repository]({{< github_repo >}}).
Build bundled jar with the following command.
-`mvn clean install -Dmaven.test.skip=true`
+`mvn clean install -DskipTests`
You can find Hive connector jar in
`./paimon-hive/paimon-hive-connector-<hive-version>/target/paimon-hive-connector-<hive-version>-{{<
version >}}.jar`.
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index 943f3da05..af3c2706d 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -33,6 +33,7 @@ under the License.
<properties>
<flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
+ <flink.cdc.version>2.3.0</flink.cdc.version>
<flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
</properties>
@@ -61,6 +62,21 @@ under the License.
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-connector-mysql-cdc</artifactId>
+ <version>${flink.cdc.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- testcontainers -->
<dependency>
@@ -76,6 +92,13 @@ under the License.
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -134,6 +157,16 @@ under the License.
<outputDirectory>/tmp/paimon-e2e-tests-jars
</outputDirectory>
</artifactItem>
+ <artifactItem>
+ <groupId>com.ververica</groupId>
+
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
+ <version>${flink.cdc.version}</version>
+ <destFileName>mysql-cdc.jar</destFileName>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ <outputDirectory>/tmp/paimon-e2e-tests-jars
+ </outputDirectory>
+ </artifactItem>
<!-- test paimon with kafka sql jar -->
<artifactItem>
<groupId>org.apache.flink</groupId>
@@ -159,7 +192,36 @@ under the License.
</artifactItems>
</configuration>
</plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>3.1.0</version>
+ <executions>
+ <execution>
+ <id>FLINK-31695</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <!--
+ Due to FLINK-31695, we need to delete conflicting classes
from jar file.
+ Remove this work-around after we make our own e2e docker
image,
+ with both Flink and Hadoop in the same docker.
+ -->
+ <executable>zip</executable>
+ <arguments>
+ <argument>-d</argument>
+
<argument>/tmp/paimon-e2e-tests-jars/bundled-hadoop.jar</argument>
+
<argument>org/apache/commons/cli/CommandLine.class</argument>
+ </arguments>
+ </configuration>
+ </plugin>
</plugins>
+
<resources>
<resource>
<directory>src/test/resources</directory>
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
index f716e25c5..a3c66ef6c 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
@@ -39,6 +40,8 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static org.junit.jupiter.api.Assertions.fail;
@@ -77,6 +80,7 @@ public abstract class E2eTestBase {
private static final int CHECK_RESULT_RETRIES = 60;
private final List<String> currentResults = new ArrayList<>();
+ protected Network network;
protected DockerComposeContainer<?> environment;
protected ContainerState jobManager;
@@ -85,6 +89,8 @@ public abstract class E2eTestBase {
List<String> services = new ArrayList<>();
services.add("jobmanager");
services.add("taskmanager");
+
+ network = Network.newNetwork();
environment =
new DockerComposeContainer<>(
new File(
@@ -92,8 +98,10 @@ public abstract class E2eTestBase {
.getClassLoader()
.getResource("docker-compose.yaml")
.toURI()))
+ .withEnv("NETWORK_ID", network.getId())
.withLogConsumer("jobmanager_1", new LogConsumer(LOG))
- .withLogConsumer("taskmanager_1", new
LogConsumer(LOG));
+ .withLogConsumer("taskmanager_1", new LogConsumer(LOG))
+ .withLocalCompose(true);
if (withKafka) {
List<String> kafkaServices = Arrays.asList("zookeeper", "kafka");
services.addAll(kafkaServices);
@@ -142,6 +150,9 @@ public abstract class E2eTestBase {
if (environment != null) {
environment.stop();
}
+ if (network != null) {
+ network.close();
+ }
}
protected void writeSharedFile(String filename, String content) throws
Exception {
@@ -200,7 +211,11 @@ public abstract class E2eTestBase {
topicName, tmpDir, filename));
}
- protected void runSql(String sql) throws Exception {
+ private static final Pattern JOB_ID_PATTERN =
+ Pattern.compile(
+ "SQL update statement has been successfully submitted to
the cluster:\\s+Job ID: (\\S+)");
+
+ protected String runSql(String sql) throws Exception {
String fileName = UUID.randomUUID() + ".sql";
writeSharedFile(fileName, sql);
Container.ExecResult execResult =
@@ -214,6 +229,13 @@ public abstract class E2eTestBase {
if (execResult.getExitCode() != 0) {
throw new AssertionError("Failed when submitting the SQL job.");
}
+
+ Matcher matcher = JOB_ID_PATTERN.matcher(execResult.getStdout());
+ if (matcher.find()) {
+ return matcher.group(1);
+ } else {
+ return null;
+ }
}
protected String createResultSink(String sinkName, String schema) {
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
new file mode 100644
index 000000000..732f1eee2
--- /dev/null
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tests.cdc;
+
+import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+
+/**
+ * E2e tests for {@link
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
+ * 5.7.
+ */
+public class MySql57E2eTest extends MySqlCdcE2eTestBase {
+
+ public MySql57E2eTest() {
+ super(MySqlVersion.V5_7);
+ }
+}
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
new file mode 100644
index 000000000..c0ecd90d7
--- /dev/null
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tests.cdc;
+
+import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+
+/**
+ * E2e tests for {@link
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
+ * 8.0.
+ */
+public class MySql80E2eTest extends MySqlCdcE2eTestBase {
+
+ public MySql80E2eTest() {
+ super(MySqlVersion.V8_0);
+ }
+}
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
new file mode 100644
index 000000000..a0bede8a8
--- /dev/null
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tests.cdc;
+
+import org.apache.paimon.flink.action.cdc.mysql.MySqlContainer;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+import org.apache.paimon.tests.E2eTestBase;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+/** E2e tests for {@link
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction}. */
+public abstract class MySqlCdcE2eTestBase extends E2eTestBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlCdcE2eTestBase.class);
+
+ private static final String USER = "paimonuser";
+ private static final String PASSWORD = "paimonpw";
+ private static final String DATABASE_NAME = "paimon_test";
+
+ private final MySqlVersion mySqlVersion;
+ private MySqlContainer mySqlContainer;
+
+ private String warehousePath;
+ private String catalogDdl;
+ private String useCatalogCmd;
+
+ protected MySqlCdcE2eTestBase(MySqlVersion mySqlVersion) {
+ this.mySqlVersion = mySqlVersion;
+ }
+
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ mySqlContainer = createMySqlContainer(mySqlVersion);
+ Startables.deepStart(Stream.of(mySqlContainer)).join();
+
+ warehousePath = TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store";
+ catalogDdl =
+ String.format(
+ "CREATE CATALOG ts_catalog WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '%s'\n"
+ + ");",
+ warehousePath);
+
+ useCatalogCmd = "USE CATALOG ts_catalog;";
+ }
+
+ private MySqlContainer createMySqlContainer(MySqlVersion version) {
+ return (MySqlContainer)
+ new MySqlContainer(version)
+ .withConfigurationOverride("mysql/my.cnf")
+ .withSetupSQL("mysql/setup.sql")
+ .withUsername(USER)
+ .withPassword(PASSWORD)
+ .withDatabaseName(DATABASE_NAME)
+ .withLogConsumer(new Slf4jLogConsumer(LOG))
+ // connect with docker-compose.yaml
+ .withNetwork(network)
+ .withNetworkAliases("mysql-1");
+ }
+
+ @AfterEach
+ public void after() {
+ mySqlContainer.stop();
+ super.after();
+ }
+
+ @Test
+ public void testSyncTable() throws Exception {
+ String runActionCommand =
+ String.join(
+ " ",
+ "bin/flink",
+ "run",
+ "-c",
+ "org.apache.paimon.flink.action.FlinkActions",
+ "-D",
+ "execution.checkpointing.interval=1s",
+ "--detached",
+ "lib/paimon-flink.jar",
+ "mysql-sync-table",
+ "--warehouse",
+ warehousePath,
+ "--database",
+ "default",
+ "--table",
+ "ts_table",
+ "--partition-keys",
+ "pt",
+ "--primary-keys",
+ "pt,_id",
+ "--sink-parallelism",
+ "2",
+ "--mysql-conf",
+ "hostname=mysql-1",
+ "--mysql-conf",
+ String.format("port=%d", MySqlContainer.MYSQL_PORT),
+ "--mysql-conf",
+ String.format("username='%s'",
mySqlContainer.getUsername()),
+ "--mysql-conf",
+ String.format("password='%s'",
mySqlContainer.getPassword()),
+ "--mysql-conf",
+ String.format("database-name='%s'", DATABASE_NAME),
+ "--mysql-conf",
+ "table-name='schema_evolution_.+'",
+ "--paimon-conf",
+ "bucket=2");
+ Container.ExecResult execResult =
+ jobManager.execInContainer("su", "flink", "-c",
runActionCommand);
+ LOG.info(execResult.getStdout());
+ LOG.info(execResult.getStderr());
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ String.format(
+ "jdbc:mysql://%s:%s/",
+ mySqlContainer.getHost(),
mySqlContainer.getDatabasePort()),
+ mySqlContainer.getUsername(),
+ mySqlContainer.getPassword())) {
+ try (Statement statement = conn.createStatement()) {
+ testSyncTableImpl(statement);
+ }
+ }
+ }
+
+ private void testSyncTableImpl(Statement statement) throws Exception {
+ statement.executeUpdate("USE paimon_test");
+
+ statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 1,
'one')");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_2 VALUES (1, 2, 'two'), (2, 4,
'four')");
+
+ String jobId =
+ runSql(
+ "INSERT INTO result1 SELECT * FROM ts_table;",
+ catalogDdl,
+ useCatalogCmd,
+ "",
+ createResultSink("result1", "pt INT, _id INT, v1
VARCHAR(10)"));
+ checkResult("1, 1, one", "1, 2, two", "2, 4, four");
+ clearCurrentResults();
+ Container.ExecResult execResult =
jobManager.execInContainer("bin/flink", "cancel", jobId);
+ LOG.info(execResult.getStdout());
+ LOG.info(execResult.getStderr());
+
+ statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v2
INT");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_1 VALUES (2, 3, 'three', 30),
(1, 5, 'five', 50)");
+ statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v2
INT");
+ statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (1, 6,
'six', 60)");
+
+ jobId =
+ runSql(
+ "INSERT INTO result2 SELECT * FROM ts_table;",
+ catalogDdl,
+ useCatalogCmd,
+ "",
+ createResultSink("result2", "pt INT, _id INT, v1
VARCHAR(10), v2 INT"));
+ checkResult(
+ "1, 1, one, null",
+ "1, 2, two, null",
+ "2, 3, three, 30",
+ "2, 4, four, null",
+ "1, 5, five, 50",
+ "1, 6, six, 60");
+ clearCurrentResults();
+ jobManager.execInContainer("bin/flink", "cancel", jobId);
+
+ statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN
v2 BIGINT");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_1 VALUES (2, 7, 'seven',
70000000000)");
+ statement.executeUpdate("UPDATE schema_evolution_1 SET v2 =
30000000000 WHERE _id = 3");
+ statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN
v2 BIGINT");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_2 VALUES (2, 8, 'eight',
80000000000)");
+
+ jobId =
+ runSql(
+ "INSERT INTO result3 SELECT * FROM ts_table;",
+ catalogDdl,
+ useCatalogCmd,
+ "",
+ createResultSink("result3", "pt INT, _id INT, v1
VARCHAR(10), v2 BIGINT"));
+ checkResult(
+ "1, 1, one, null",
+ "1, 2, two, null",
+ "2, 3, three, 30000000000",
+ "2, 4, four, null",
+ "1, 5, five, 50",
+ "1, 6, six, 60",
+ "2, 7, seven, 70000000000",
+ "2, 8, eight, 80000000000");
+ clearCurrentResults();
+ jobManager.execInContainer("bin/flink", "cancel", jobId);
+ }
+
+ private String runSql(String sql, String... ddls) throws Exception {
+ return runSql(String.join("\n", ddls) + "\n" + sql);
+ }
+}
diff --git a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
index 88d9c6857..a24999e47 100644
--- a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
+++ b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
@@ -31,7 +31,7 @@ services:
- /tmp/paimon-e2e-tests-jars:/jars
entrypoint: >
/bin/bash -c "
- cp /jars/paimon-flink.jar /jars/bundled-hadoop.jar
+ cp /jars/paimon-flink.jar /jars/bundled-hadoop.jar /jars/mysql-cdc.jar
/jars/flink-sql-connector-kafka.jar /jars/flink-sql-connector-hive.jar
/opt/flink/lib ;
echo 'See FLINK-31659 for why we need the following two steps' ;
mv /opt/flink/opt/flink-table-planner*.jar /opt/flink/lib/ ;
@@ -54,7 +54,7 @@ services:
- /tmp/paimon-e2e-tests-jars:/jars
entrypoint: >
/bin/bash -c "
- cp /jars/paimon-flink.jar /jars/bundled-hadoop.jar
+ cp /jars/paimon-flink.jar /jars/bundled-hadoop.jar /jars/mysql-cdc.jar
/jars/flink-sql-connector-kafka.jar /jars/flink-sql-connector-hive.jar
/opt/flink/lib ;
echo 'See FLINK-31659 for why we need the following two steps' ;
mv /opt/flink/opt/flink-table-planner*.jar /opt/flink/lib/ ;
@@ -222,3 +222,5 @@ volumes:
networks:
testnetwork:
+ name: ${NETWORK_ID}
+ external: true
diff --git a/paimon-e2e-tests/src/test/resources/mysql/my.cnf
b/paimon-e2e-tests/src/test/resources/mysql/my.cnf
new file mode 100644
index 000000000..9c9a8747b
--- /dev/null
+++ b/paimon-e2e-tests/src/test/resources/mysql/my.cnf
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but
would
+# be longer on a production system. Row-level info is required for ingest to
work.
+# Server ID is required, but this will vary on production systems
+server-id = 223344
+log_bin = mysql-bin
+expire_logs_days = 1
+binlog_format = row
+
+# If this option is off
+# MySQL will set current timestamp to any timestamp field with NULL value
+# which is bad for our tests
+explicit_defaults_for_timestamp = ON
diff --git a/paimon-e2e-tests/src/test/resources/mysql/setup.sql
b/paimon-e2e-tests/src/test/resources/mysql/setup.sql
new file mode 100644
index 000000000..1477afa49
--- /dev/null
+++ b/paimon-e2e-tests/src/test/resources/mysql/setup.sql
@@ -0,0 +1,42 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- In production you would almost certainly limit the replication user must be
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'paimonuser' - all privileges required by the snapshot reader AND binlog
reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,
LOCK TABLES ON *.* TO 'paimonuser'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+
+USE paimon_test;
+
+CREATE TABLE schema_evolution_1 (
+ pt INT,
+ _id INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (_id)
+);
+
+CREATE TABLE schema_evolution_2 (
+ pt INT,
+ _id INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (_id)
+);
diff --git
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
new file mode 100644
index 000000000..c7bf42a64
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * Utility methods for {@link SingleOutputStreamOperator}.
+ *
+ * <p>TODO: merge all Flink version related utils into one class.
+ */
+public class SingleOutputStreamOperatorUtils {
+
+ public static <T> DataStream<T> getSideOutput(
+ SingleOutputStreamOperator<?> input, OutputTag<T> outputTag) {
+ return input.getSideOutput(outputTag);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
new file mode 100644
index 000000000..c7bf42a64
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * Utility methods for {@link SingleOutputStreamOperator}.
+ *
+ * <p>TODO: merge all Flink version related utils into one class.
+ */
+public class SingleOutputStreamOperatorUtils {
+
+ public static <T> DataStream<T> getSideOutput(
+ SingleOutputStreamOperator<?> input, OutputTag<T> outputTag) {
+ return input.getSideOutput(outputTag);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index 2dad11255..69c5ba799 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -294,6 +294,18 @@ under the License.
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
index 05ffe854e..8ef2d391e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.action;
import org.apache.paimon.catalog.CatalogUtils;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
@@ -116,6 +117,8 @@ public interface Action {
private static final String DROP_PARTITION = "drop-partition";
private static final String DELETE = "delete";
private static final String MERGE_INTO = "merge-into";
+ // cdc actions
+ private static final String MYSQL_SYNC_TABLE = "mysql-sync-table";
public static Optional<Action> create(String[] args) {
String action = args[0].toLowerCase();
@@ -130,6 +133,8 @@ public interface Action {
return DeleteAction.create(actionArgs);
case MERGE_INTO:
return MergeIntoAction.create(actionArgs);
+ case MYSQL_SYNC_TABLE:
+ return MySqlSyncTableAction.create(actionArgs);
default:
System.err.println("Unknown action \"" + action + "\"");
printHelp();
@@ -146,6 +151,7 @@ public interface Action {
System.out.println(" " + DROP_PARTITION);
System.out.println(" " + DELETE);
System.out.println(" " + MERGE_INTO);
+ System.out.println(" " + MYSQL_SYNC_TABLE);
System.out.println("For detailed options of each action, run
<action> --help");
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 9a7710840..ee6b51fb1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -44,6 +44,9 @@ import
com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.connect.json.JsonConverterConfig;
@@ -51,9 +54,10 @@ import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
-import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -62,6 +66,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* An {@link Action} which synchronize one or multiple MySQL tables into one
Paimon table.
@@ -95,7 +100,7 @@ import java.util.regex.Pattern;
*/
public class MySqlSyncTableAction implements Action {
- private final Map<String, String> mySqlConfig;
+ private final Configuration mySqlConfig;
private final String warehouse;
private final String database;
private final String table;
@@ -111,7 +116,7 @@ public class MySqlSyncTableAction implements Action {
List<String> partitionKeys,
List<String> primaryKeys,
Map<String, String> paimonConfig) {
- this.mySqlConfig = mySqlConfig;
+ this.mySqlConfig = Configuration.fromMap(mySqlConfig);
this.warehouse = warehouse;
this.database = database;
this.table = table;
@@ -120,13 +125,6 @@ public class MySqlSyncTableAction implements Action {
this.paimonConfig = paimonConfig;
}
- @Override
- public void run() throws Exception {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- build(env);
- env.execute(String.format("MySQL CTAS: %s.%s", database, table));
- }
-
public void build(StreamExecutionEnvironment env) throws Exception {
MySqlSource<String> source = buildSource();
MySqlSchema mySqlSchema =
@@ -162,7 +160,7 @@ public class MySqlSyncTableAction implements Action {
}
EventParser.Factory<String> parserFactory;
- String serverTimeZone = mySqlConfig.get("server-time-zone");
+ String serverTimeZone =
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
if (serverTimeZone != null) {
parserFactory = () -> new
MySqlDebeziumJsonEventParser(ZoneId.of(serverTimeZone));
} else {
@@ -186,32 +184,37 @@ public class MySqlSyncTableAction implements Action {
private MySqlSource<String> buildSource() {
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
- String databaseName =
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME.key());
- String tableName =
mySqlConfig.get(MySqlSourceOptions.TABLE_NAME.key());
+ String databaseName =
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
+ String tableName = mySqlConfig.get(MySqlSourceOptions.TABLE_NAME);
sourceBuilder
- .hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME.key()))
-
.port(Integer.parseInt(mySqlConfig.get(MySqlSourceOptions.PORT.key())))
- .username(mySqlConfig.get(MySqlSourceOptions.USERNAME.key()))
- .password(mySqlConfig.get(MySqlSourceOptions.PASSWORD.key()))
+ .hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME))
+ .port(mySqlConfig.get(MySqlSourceOptions.PORT))
+ .username(mySqlConfig.get(MySqlSourceOptions.USERNAME))
+ .password(mySqlConfig.get(MySqlSourceOptions.PASSWORD))
.databaseList(databaseName)
.tableList(databaseName + "." + tableName);
-
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.SERVER_ID.key()))
- .ifPresent(sourceBuilder::serverId);
-
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE.key()))
+
mySqlConfig.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
+ mySqlConfig
+ .getOptional(MySqlSourceOptions.SERVER_TIME_ZONE)
.ifPresent(sourceBuilder::serverTimeZone);
-
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.key()))
- .ifPresent(size ->
sourceBuilder.fetchSize(Integer.parseInt(size)));
-
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.CONNECT_TIMEOUT.key()))
- .ifPresent(timeout ->
sourceBuilder.connectTimeout(Duration.parse(timeout)));
-
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.CONNECT_MAX_RETRIES.key()))
- .ifPresent(retries ->
sourceBuilder.connectMaxRetries(Integer.parseInt(retries)));
-
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.CONNECTION_POOL_SIZE.key()))
- .ifPresent(size ->
sourceBuilder.connectionPoolSize(Integer.parseInt(size)));
-
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.HEARTBEAT_INTERVAL.key()))
- .ifPresent(interval ->
sourceBuilder.heartbeatInterval(Duration.parse(interval)));
-
- String startupMode =
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE.key());
+ mySqlConfig
+ .getOptional(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE)
+ .ifPresent(sourceBuilder::fetchSize);
+ mySqlConfig
+ .getOptional(MySqlSourceOptions.CONNECT_TIMEOUT)
+ .ifPresent(sourceBuilder::connectTimeout);
+ mySqlConfig
+ .getOptional(MySqlSourceOptions.CONNECT_MAX_RETRIES)
+ .ifPresent(sourceBuilder::connectMaxRetries);
+ mySqlConfig
+ .getOptional(MySqlSourceOptions.CONNECTION_POOL_SIZE)
+ .ifPresent(sourceBuilder::connectionPoolSize);
+ mySqlConfig
+ .getOptional(MySqlSourceOptions.HEARTBEAT_INTERVAL)
+ .ifPresent(sourceBuilder::heartbeatInterval);
+
+ String startupMode =
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
// see
//
https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L196
if ("initial".equalsIgnoreCase(startupMode)) {
@@ -222,40 +225,30 @@ public class MySqlSyncTableAction implements Action {
sourceBuilder.startupOptions(StartupOptions.latest());
} else if ("specific-offset".equalsIgnoreCase(startupMode)) {
BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
- String file =
-
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE.key());
- String pos =
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS.key());
+ String file =
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+ Long pos =
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
if (file != null && pos != null) {
- offsetBuilder.setBinlogFilePosition(file, Long.parseLong(pos));
+ offsetBuilder.setBinlogFilePosition(file, pos);
}
- Optional.ofNullable(
- mySqlConfig.get(
-
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET.key()))
+ mySqlConfig
+
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
.ifPresent(offsetBuilder::setGtidSet);
- Optional.ofNullable(
- mySqlConfig.get(
-
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS
- .key()))
- .ifPresent(
- skipEvents ->
offsetBuilder.setSkipEvents(Long.parseLong(skipEvents)));
- Optional.ofNullable(
- mySqlConfig.get(
-
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS
- .key()))
- .ifPresent(skipRows ->
offsetBuilder.setSkipRows(Long.parseLong(skipRows)));
+ mySqlConfig
+
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
+ .ifPresent(offsetBuilder::setSkipEvents);
+ mySqlConfig
+
.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
+ .ifPresent(offsetBuilder::setSkipRows);
sourceBuilder.startupOptions(StartupOptions.specificOffset(offsetBuilder.build()));
} else if ("timestamp".equalsIgnoreCase(startupMode)) {
sourceBuilder.startupOptions(
StartupOptions.timestamp(
- Long.parseLong(
- mySqlConfig.get(
-
MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS
- .key()))));
+
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
}
Properties jdbcProperties = new Properties();
Properties debeziumProperties = new Properties();
- for (Map.Entry<String, String> entry : mySqlConfig.entrySet()) {
+ for (Map.Entry<String, String> entry : mySqlConfig.toMap().entrySet())
{
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
@@ -277,18 +270,17 @@ public class MySqlSyncTableAction implements Action {
private List<MySqlSchema> getMySqlSchemaList() throws Exception {
Pattern databasePattern =
-
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME.key()));
- Pattern tablePattern =
-
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME.key()));
+
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
+ Pattern tablePattern =
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
try (Connection conn =
DriverManager.getConnection(
String.format(
- "jdbc:mysql://%s:%s/",
-
mySqlConfig.get(MySqlSourceOptions.HOSTNAME.key()),
-
mySqlConfig.get(MySqlSourceOptions.PORT.key())),
- mySqlConfig.get(MySqlSourceOptions.USERNAME.key()),
- mySqlConfig.get(MySqlSourceOptions.PASSWORD.key()))) {
+ "jdbc:mysql://%s:%d/",
+ mySqlConfig.get(MySqlSourceOptions.HOSTNAME),
+ mySqlConfig.get(MySqlSourceOptions.PORT)),
+ mySqlConfig.get(MySqlSourceOptions.USERNAME),
+ mySqlConfig.get(MySqlSourceOptions.PASSWORD))) {
DatabaseMetaData metaData = conn.getMetaData();
try (ResultSet schemas = metaData.getCatalogs()) {
while (schemas.next()) {
@@ -427,4 +419,139 @@ public class MySqlSyncTableAction implements Action {
return this;
}
}
+
+ // ------------------------------------------------------------------------
+ // Flink run methods
+ // ------------------------------------------------------------------------
+
+ public static Optional<Action> create(String[] args) {
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+ if (params.has("help")) {
+ printHelp();
+ return Optional.empty();
+ }
+
+ Tuple3<String, String, String> tablePath = Action.getTablePath(params);
+ if (tablePath == null) {
+ return Optional.empty();
+ }
+
+ List<String> partitionKeys = Collections.emptyList();
+ if (params.has("partition-keys")) {
+ partitionKeys =
+ Arrays.stream(params.get("partition-keys").split(","))
+ .collect(Collectors.toList());
+ }
+
+ List<String> primaryKeys = Collections.emptyList();
+ if (params.has("primary-keys")) {
+ primaryKeys =
+ Arrays.stream(params.get("primary-keys").split(","))
+ .collect(Collectors.toList());
+ }
+
+ Map<String, String> mySqlConfig = getConfigMap(params, "mysql-conf");
+ Map<String, String> paimonConfig = getConfigMap(params, "paimon-conf");
+ if (mySqlConfig == null || paimonConfig == null) {
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ new MySqlSyncTableAction(
+ mySqlConfig,
+ tablePath.f0,
+ tablePath.f1,
+ tablePath.f2,
+ partitionKeys,
+ primaryKeys,
+ paimonConfig));
+ }
+
+ private static Map<String, String> getConfigMap(MultipleParameterTool
params, String key) {
+ Map<String, String> map = new HashMap<>();
+
+ for (String param : params.getMultiParameter(key)) {
+ String[] kv = param.split("=");
+ if (kv.length == 2) {
+ map.put(kv[0], kv[1]);
+ continue;
+ }
+
+ System.err.println(
+ "Invalid " + key + " " + param + ".\nRun mysql-sync-table
--help for help.");
+ return null;
+ }
+ return map;
+ }
+
+ private static void printHelp() {
+ System.out.println(
+ "Action \"mysql-sync-table\" creates a streaming job "
+ + "with a Flink MySQL CDC source and a Paimon table
sink to consume CDC events.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " mysql-sync-table --warehouse <warehouse-path> --database
<database-name> "
+ + "--table <table-name> "
+ + "[--partition-keys <partition-keys>] "
+ + "[--primary-keys <primary-keys>] "
+ + "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf
<mysql-cdc-source-conf> ...]] "
+ + "[--paimon-conf <paimon-table-sink-conf>
[--paimon-conf <paimon-table-sink-conf> ...]]");
+ System.out.println();
+
+ System.out.println("Partition keys syntax:");
+ System.out.println(" key1,key2,...");
+ System.out.println(
+ "If partition key is not defined and the specified Paimon
table does not exist, "
+ + "this action will automatically create an
unpartitioned Paimon table.");
+ System.out.println();
+
+ System.out.println("Primary keys syntax:");
+ System.out.println(" key1,key2,...");
+ System.out.println("Primary keys will be derived from MySQL tables if
not specified.");
+ System.out.println();
+
+ System.out.println("MySQL CDC source conf syntax:");
+ System.out.println(" key=value");
+ System.out.println(
+ "'hostname', 'username', 'password', 'database-name' and
'table-name' "
+ + "are required configurations, others are optional.");
+ System.out.println(
+ "For a complete list of supported configurations, "
+ + "see
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options");
+ System.out.println();
+
+ System.out.println("Paimon table sink conf syntax:");
+ System.out.println(" key=value");
+ System.out.println(
+ "For a complete list of supported configurations, "
+ + "see
https://paimon.apache.org/docs/master/maintenance/configurations/");
+ System.out.println();
+
+ System.out.println("Examples:");
+ System.out.println(
+ " mysql-sync-table \\"
+ + " --warehouse hdfs:///path/to/warehouse \\"
+ + " --database test_db \\"
+ + " --table test_table \\"
+ + " --partition-keys pt \\"
+ + " --primary-keys pt,uid \\"
+ + " --mysql-conf hostname=127.0.0.1 \\"
+ + " --mysql-conf username=root \\"
+ + " --mysql-conf password=123456 \\"
+ + " --mysql-conf database-name=source_db \\"
+ + " --mysql-conf table-name='source_table_.*' \\"
+ + " --paimon-conf bucket=4 \\"
+ + " --paimon-conf changelog-producer=input \\"
+ + " --paimon-conf sink.parallelism=4 \\");
+ }
+
+ @Override
+ public void run() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ build(env);
+ env.execute(String.format("MySQL-Paimon Table Sync: %s.%s", database,
table));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
index acee7b911..bd9710070 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.LogSinkFunction;
+import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
@@ -89,7 +90,8 @@ public class FlinkCdcSinkBuilder<T> {
.setParallelism(input.getParallelism());
DataStream<Void> schemaChangeProcessFunction =
-
parsed.getSideOutput(CdcParsingProcessFunction.SCHEMA_CHANGE_OUTPUT_TAG)
+ SingleOutputStreamOperatorUtils.getSideOutput(
+ parsed,
CdcParsingProcessFunction.SCHEMA_CHANGE_OUTPUT_TAG)
.process(
new SchemaChangeProcessFunction(
new SchemaManager(table.fileIO(),
table.location())));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
new file mode 100644
index 000000000..c7bf42a64
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/SingleOutputStreamOperatorUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * Utility methods for {@link SingleOutputStreamOperator}.
+ *
+ * <p>TODO: merge all Flink version related utils into one class.
+ */
+public class SingleOutputStreamOperatorUtils {
+
+ public static <T> DataStream<T> getSideOutput(
+ SingleOutputStreamOperator<?> input, OutputTag<T> outputTag) {
+ return input.getSideOutput(outputTag);
+ }
+}