This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a3b3fc7f70 [INLONG-8967][Sort] Add Mysql connector on flink 1.15
(#8980)
a3b3fc7f70 is described below
commit a3b3fc7f70cb1e78335888daede75ebb44d62ca4
Author: EpicMo <[email protected]>
AuthorDate: Sat Oct 7 09:50:50 2023 +0800
[INLONG-8967][Sort] Add Mysql connector on flink 1.15 (#8980)
---
.../src/main/assemblies/sort-connectors-v1.15.xml | 8 +
inlong-sort/sort-core/pom.xml | 6 +
.../sort-end-to-end-tests-v1.15/pom.xml | 8 +
.../apache/inlong/sort/tests/MysqlToRocksTest.java | 170 +++++++
.../inlong/sort/tests/PostgresToStarRocksTest.java | 88 +---
.../sort/tests/utils/FlinkContainerTestEnv.java | 1 +
.../inlong/sort/tests/utils/MySqlContainer.java | 8 +-
.../sort/tests/utils/StarRocksContainer.java | 5 -
.../inlong/sort/tests/utils/StarRocksManager.java | 79 +++
.../src/test/resources/docker/mysql/my.cnf | 3 +-
.../src/test/resources/docker/mysql/setup.sql | 3 +-
.../src/test/resources/flinkSql/mysql_test.sql | 38 ++
.../sort-connectors/mysql-cdc/pom.xml | 178 +++++++
.../inlong/sort/mysql/MysqlTableFactory.java | 537 +++++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 16 +
.../sort-flink-v1.15/sort-connectors/pom.xml | 1 +
.../sort/formats/json/utils/FormatJsonUtil.java | 1 +
17 files changed, 1072 insertions(+), 78 deletions(-)
diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
index f61fbc0e9e..184e727a91 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
@@ -51,5 +51,13 @@
</includes>
<fileMode>0644</fileMode>
</fileSet>
+ <fileSet>
+
<directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/target</directory>
+ <outputDirectory>inlong-sort/connectors</outputDirectory>
+ <includes>
+
<include>sort-connector-mysql-v1.15-${project.version}.jar</include>
+ </includes>
+ <fileMode>0644</fileMode>
+ </fileSet>
</fileSets>
</assembly>
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index 0d2626b748..b18604b67d 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -257,6 +257,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-mysql-cdc-v1.15</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
index 00cc1803ce..6841331770 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
@@ -165,6 +165,14 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
+ <artifactItem>
+ <groupId>org.apache.inlong</groupId>
+
<artifactId>sort-connector-mysql-cdc-v1.15</artifactId>
+ <version>${project.version}</version>
+
<destFileName>sort-connector-mysql-cdc.jar</destFileName>
+ <type>jar</type>
+
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </artifactItem>
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-starrocks-v1.15</artifactId>
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
new file mode 100644
index 0000000000..e02501cfd1
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests;
+
+import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
+import org.apache.inlong.sort.tests.utils.JdbcProxy;
+import org.apache.inlong.sort.tests.utils.MySqlContainer;
+import org.apache.inlong.sort.tests.utils.StarRocksContainer;
+import org.apache.inlong.sort.tests.utils.StarRocksManager;
+import org.apache.inlong.sort.tests.utils.TestUtils;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
+import static
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
+import static
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
+import static
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
+
+/**
+ * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
+ * Test flink sql Mysql cdc to StarRocks
+ */
+public class MysqlToRocksTest extends FlinkContainerTestEnv {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MysqlToRocksTest.class);
+
+ private static final Path mysqlJar =
TestUtils.getResource("sort-connector-mysql-cdc.jar");
+ private static final Path jdbcJar =
TestUtils.getResource("sort-connector-starrocks.jar");
+ private static final Path mysqlJdbcJar =
TestUtils.getResource("mysql-driver.jar");
+ private static final String sqlFile;
+
+ static {
+ try {
+ sqlFile =
+
Paths.get(MysqlToRocksTest.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
+ StarRocksManager.buildStarRocksImage();
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClassRule
+ public static StarRocksContainer STAR_ROCKS =
+ (StarRocksContainer) new
StarRocksContainer(getNewStarRocksImageName())
+ .withExposedPorts(9030, 8030, 8040)
+ .withNetwork(NETWORK)
+ .withAccessToHost(true)
+ .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
+ .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
+
+ @ClassRule
+ public static final MySqlContainer MYSQL_CONTAINER =
+ (MySqlContainer) new
MySqlContainer(MySqlContainer.MySqlVersion.V8_0)
+ .withDatabaseName("test")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("mysql")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ @Before
+ public void setup() {
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ initializeMysqlTable();
+ initializeStarRocksTable(STAR_ROCKS);
+ }
+
+ private void initializeMysqlTable() {
+ try {
+ Class.forName(MYSQL_CONTAINER.getDriverClassName());
+ Connection conn = DriverManager
+ .getConnection(MYSQL_CONTAINER.getJdbcUrl(),
MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ Statement stat = conn.createStatement();
+ stat.execute(
+ "CREATE TABLE test_input1 (\n"
+ + " id SERIAL,\n"
+ + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+ + " description VARCHAR(512),\n"
+ + " PRIMARY KEY(id)\n"
+ + ");");
+ stat.close();
+ conn.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (MYSQL_CONTAINER != null) {
+ MYSQL_CONTAINER.stop();
+ }
+ if (STAR_ROCKS != null) {
+ STAR_ROCKS.stop();
+ }
+ }
+
+ /**
+ * Test flink sql postgresql cdc to StarRocks
+ *
+ * @throws Exception The exception may throws when execute the case
+ */
+ @Test
+ public void testMysqlUpdateAndDelete() throws Exception {
+ submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlJdbcJar);
+ waitUntilJobRunning(Duration.ofSeconds(10));
+
+ // generate input
+ try (Connection conn =
+ DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(),
MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ Statement stat = conn.createStatement()) {
+ stat.execute(
+ "INSERT INTO test_input1 "
+ + "VALUES (1,'jacket','water resistent white wind
breaker');");
+ stat.execute(
+ "INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel
scooter ');");
+ stat.execute(
+ "update test_input1 set name = 'tom' where id = 2;");
+ stat.execute(
+ "delete from test_input1 where id = 1;");
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+
+ JdbcProxy proxy =
+ new JdbcProxy(STAR_ROCKS.getJdbcUrl(),
STAR_ROCKS.getUsername(),
+ STAR_ROCKS.getPassword(),
+ STAR_ROCKS.getDriverClassName());
+ List<String> expectResult =
+ Arrays.asList("2,tom,Big 2-wheel scooter ");
+ proxy.checkResultWithTimeout(
+ expectResult,
+ "test_output1",
+ 3,
+ 60000L);
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java
index 499cc4b7f2..5c7208a7f6 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/PostgresToStarRocksTest.java
@@ -28,12 +28,9 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.MountableFile;
import java.net.URISyntaxException;
import java.nio.file.Path;
@@ -45,8 +42,12 @@ import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
-import java.util.stream.Stream;
+import static
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
+import static
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
+import static
org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
+import static
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
+import static
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
/**
* End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
* Test flink sql Postgres cdc to StarRocks
@@ -58,72 +59,42 @@ public class PostgresToStarRocksTest extends
FlinkContainerTestEnv {
private static final Path postgresJar =
TestUtils.getResource("sort-connector-postgres-cdc.jar");
private static final Path jdbcJar =
TestUtils.getResource("sort-connector-starrocks.jar");
private static final Path mysqlJdbcJar =
TestUtils.getResource("mysql-driver.jar");
-
- private static final Logger STAR_ROCKS_LOG =
LoggerFactory.getLogger(StarRocksContainer.class);
-
private static final String sqlFile;
- //
----------------------------------------------------------------------------------------
- // StarRocks Variables
- //
----------------------------------------------------------------------------------------
- private static final String INTER_CONTAINER_STAR_ROCKS_ALIAS = "starrocks";
- private static final String NEW_STARROCKS_REPOSITORY = "inlong-starrocks";
- private static final String NEW_STARROCKS_TAG = "latest";
- private static final String STAR_ROCKS_IMAGE_NAME =
"starrocks/allin1-ubi:3.0.4";
-
static {
try {
- sqlFile =
Paths.get(PostgresToStarRocksTest.class.getResource("/flinkSql/postgres_test.sql").toURI()).toString();
+ sqlFile =
Paths.get(PostgresToStarRocksTest.class.getResource("/flinkSql/postgres_test.sql").toURI())
+ .toString();
buildStarRocksImage();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
- private static String getNewStarRocksImageName() {
- return NEW_STARROCKS_REPOSITORY + ":" + NEW_STARROCKS_TAG;
- }
-
- public static void buildStarRocksImage() {
- GenericContainer oldStarRocks = new
GenericContainer(STAR_ROCKS_IMAGE_NAME);
- Startables.deepStart(Stream.of(oldStarRocks)).join();
-
oldStarRocks.copyFileToContainer(MountableFile.forClasspathResource("/docker/starrocks/start_fe_be.sh"),
- "/data/deploy/");
- try {
- oldStarRocks.execInContainer("chmod", "+x",
"/data/deploy/start_fe_be.sh");
- } catch (Exception e) {
- e.printStackTrace();
- }
- oldStarRocks.getDockerClient()
- .commitCmd(oldStarRocks.getContainerId())
- .withRepository(NEW_STARROCKS_REPOSITORY)
- .withTag(NEW_STARROCKS_TAG).exec();
- oldStarRocks.stop();
- }
-
@ClassRule
- public static StarRocksContainer STAR_ROCKS = (StarRocksContainer) new
StarRocksContainer(getNewStarRocksImageName())
- .withExposedPorts(9030, 8030, 8040)
- .withNetwork(NETWORK)
- .withAccessToHost(true)
- .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
- .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
+ public static StarRocksContainer STAR_ROCKS =
+ (StarRocksContainer) new
StarRocksContainer(getNewStarRocksImageName())
+ .withExposedPorts(9030, 8030, 8040)
+ .withNetwork(NETWORK)
+ .withAccessToHost(true)
+ .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
+ .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
@ClassRule
public static final PostgreSQLContainer POSTGRES_CONTAINER =
(PostgreSQLContainer) new PostgreSQLContainer(
DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres"))
- .withUsername("flinkuser")
- .withPassword("flinkpw")
- .withDatabaseName("test")
- .withNetwork(NETWORK)
- .withNetworkAliases("postgres")
- .withLogConsumer(new Slf4jLogConsumer(LOG));
+ .withUsername("flinkuser")
+ .withPassword("flinkpw")
+ .withDatabaseName("test")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("postgres")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
initializePostgresTable();
- initializeStarRocksTable();
+ initializeStarRocksTable(STAR_ROCKS);
}
private void initializePostgresTable() {
@@ -149,23 +120,6 @@ public class PostgresToStarRocksTest extends
FlinkContainerTestEnv {
}
}
- private void initializeStarRocksTable() {
- try (Connection conn =
- DriverManager.getConnection(STAR_ROCKS.getJdbcUrl(),
STAR_ROCKS.getUsername(),
- STAR_ROCKS.getPassword());
- Statement stat = conn.createStatement()) {
- stat.execute("CREATE TABLE IF NOT EXISTS test_output1 (\n"
- + " id INT NOT NULL,\n"
- + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
- + " description VARCHAR(512)\n"
- + ")\n"
- + "PRIMARY KEY(id)\n"
- + "DISTRIBUTED by HASH(id) PROPERTIES (\"replication_num\"
= \"1\");");
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
@AfterClass
public static void teardown() {
if (POSTGRES_CONTAINER != null) {
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
index c328ac951e..2426c57ae4 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
@@ -117,6 +117,7 @@ public abstract class FlinkContainerTestEnv extends
TestLogger {
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .withExposedPorts(JOB_MANAGER_REST_PORT)
.withLogConsumer(new Slf4jLogConsumer(JM_LOG));
taskManager =
new GenericContainer<>("flink:1.15.4-scala_2.12")
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
index 11635e5619..75ba466f1d 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
@@ -49,7 +49,7 @@ public class MySqlContainer extends JdbcDatabaseContainer {
public MySqlContainer(MySqlVersion version) {
super(DockerImageName.parse(IMAGE + ":" + version.getVersion()));
- addExposedPort(MYSQL_PORT);
+ addFixedExposedPort(33306, 3306);
}
@Override
@@ -60,8 +60,6 @@ public class MySqlContainer extends JdbcDatabaseContainer {
@Override
protected void configure() {
// HERE is the difference, copy to /etc/mysql/, if copy to
/etc/mysql/conf.d will be wrong
- optionallyMapResourceParameterAsVolume(
- MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/",
"mysql-default-conf");
if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
optionallyMapResourceParameterAsVolume(
@@ -79,6 +77,7 @@ public class MySqlContainer extends JdbcDatabaseContainer {
throw new ContainerLaunchException(
"Empty password can be used only with the root user");
}
+ withCommand("--default-authentication-plugin=mysql_native_password");
setStartupAttempts(3);
}
@@ -100,7 +99,8 @@ public class MySqlContainer extends JdbcDatabaseContainer {
+ getDatabasePort()
+ "/"
+ databaseName
- + additionalUrlParams;
+ + additionalUrlParams
+ + "?useSSL=false&allowPublicKeyRetrieval=true";
}
@Override
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java
index 47f0d673c8..4db7538058 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksContainer.java
@@ -19,14 +19,10 @@ package org.apache.inlong.sort.tests.utils;
import org.apache.commons.lang3.StringUtils;
import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.utility.DockerImageName;
-import java.io.IOException;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import static java.util.stream.Collectors.joining;
@@ -103,7 +99,6 @@ public class StarRocksContainer extends GenericContainer {
return getMappedPort(STAR_ROCKS_QUERY_PORT);
}
-
public String getDatabaseName() {
return databaseName;
}
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java
new file mode 100644
index 0000000000..3c60a4b3eb
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/StarRocksManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.stream.Stream;
+
+public class StarRocksManager {
+
+ //
----------------------------------------------------------------------------------------
+ // StarRocks Variables
+ //
----------------------------------------------------------------------------------------
+ public static final String INTER_CONTAINER_STAR_ROCKS_ALIAS = "starrocks";
+ private static final String NEW_STARROCKS_REPOSITORY = "inlong-starrocks";
+ private static final String NEW_STARROCKS_TAG = "latest";
+ private static final String STAR_ROCKS_IMAGE_NAME =
"starrocks/allin1-ubi:3.0.4";
+ public static final Logger STAR_ROCKS_LOG =
LoggerFactory.getLogger(StarRocksContainer.class);
+ public static void buildStarRocksImage() {
+ GenericContainer oldStarRocks = new
GenericContainer(STAR_ROCKS_IMAGE_NAME);
+ Startables.deepStart(Stream.of(oldStarRocks)).join();
+
oldStarRocks.copyFileToContainer(MountableFile.forClasspathResource("/docker/starrocks/start_fe_be.sh"),
+ "/data/deploy/");
+ try {
+ oldStarRocks.execInContainer("chmod", "+x",
"/data/deploy/start_fe_be.sh");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ oldStarRocks.getDockerClient()
+ .commitCmd(oldStarRocks.getContainerId())
+ .withRepository(NEW_STARROCKS_REPOSITORY)
+ .withTag(NEW_STARROCKS_TAG).exec();
+ oldStarRocks.stop();
+ }
+
+ public static String getNewStarRocksImageName() {
+ return NEW_STARROCKS_REPOSITORY + ":" + NEW_STARROCKS_TAG;
+ }
+
+ public static void initializeStarRocksTable(StarRocksContainer STAR_ROCKS)
{
+ try (Connection conn =
+ DriverManager.getConnection(STAR_ROCKS.getJdbcUrl(),
STAR_ROCKS.getUsername(),
+ STAR_ROCKS.getPassword());
+ Statement stat = conn.createStatement()) {
+ stat.execute("CREATE TABLE IF NOT EXISTS test_output1 (\n"
+ + " id INT NOT NULL,\n"
+ + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+ + " description VARCHAR(512)\n"
+ + ")\n"
+ + "PRIMARY KEY(id)\n"
+ + "DISTRIBUTED by HASH(id) PROPERTIES (\"replication_num\"
= \"1\");");
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf
index 87a492c496..bbf52cd615 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/my.cnf
@@ -60,4 +60,5 @@ binlog_format = row
# enable gtid mode
gtid_mode = on
-enforce_gtid_consistency = on
\ No newline at end of file
+enforce_gtid_consistency = on
+default_authentication_plugin=mysql_native_password
\ No newline at end of file
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql
index 9ec4b48bbd..73c4be16a7 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/mysql/setup.sql
@@ -22,4 +22,5 @@
--
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,
LOCK TABLES ON *.* TO 'flinkuser'@'%';
CREATE USER 'inlong' IDENTIFIED BY 'inlong';
-GRANT ALL PRIVILEGES ON *.* TO 'inlong'@'%';
+ALTER USER 'root'@'%' IDENTIFIED WITH 'mysql_native_password' BY 'inlong';
+FLUSH PRIVILEGES;
\ No newline at end of file
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
new file mode 100644
index 0000000000..9f74d54ae7
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
@@ -0,0 +1,38 @@
+CREATE TABLE test_input1 (
+ `id` INT primary key,
+ name STRING,
+ description STRING
+) WITH (
+ 'connector' = 'mysql-cdc-inlong',
+ 'hostname' = 'mysql',
+ 'port' = '3306',
+ 'username' = 'root',
+ 'password' = 'inlong',
+ 'database-name' = 'test',
+ 'table-name' = 'test_input1',
+ 'scan.incremental.snapshot.enabled' = 'false',
+ 'jdbc.properties.useSSL' = 'false',
+ 'jdbc.properties.allowPublicKeyRetrieval' = 'true'
+ );
+
+CREATE TABLE test_output1 (
+ `id` INT primary key,
+ name STRING,
+ description STRING
+) WITH (
+ 'connector' = 'starrocks-inlong',
+ 'jdbc-url' = 'jdbc:mysql://starrocks:9030',
+ 'load-url'='starrocks:8030',
+ 'database-name'='test',
+ 'table-name' = 'test_output1',
+ 'username' = 'inlong',
+ 'password' = 'inlong',
+ 'sink.properties.format' = 'json',
+ 'sink.properties.strip_outer_array' = 'true',
+ 'sink.buffer-flush.interval-ms' = '1000'
+ );
+
+INSERT INTO test_output1 select * from test_input1;
+
+
+
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml
new file mode 100644
index 0000000000..f69bdc20fc
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml
@@ -0,0 +1,178 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connectors-v1.15</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sort-connector-mysql-cdc-v1.15</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache InLong - Sort-connector-mysql-cdc</name>
+
+ <properties>
+
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+ </properties>
+
+ <dependencies>
+ <!-- Debezium dependencies -->
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-connector-mysql-cdc</artifactId>
+ <version>${flink.connector.mysql.cdc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-cdc-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-connector-debezium</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-log4j-appender</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka-clients.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-mysql</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>org.apache.inlong:*</include>
+ <include>io.debezium:debezium-api</include>
+
<include>io.debezium:debezium-embedded</include>
+
<include>io.debezium:debezium-core</include>
+
<include>io.debezium:debezium-ddl-parser</include>
+
<include>io.debezium:debezium-connector-mysql</include>
+
<include>com.ververica:flink-connector-debezium</include>
+
<include>com.ververica:flink-connector-mysql-cdc</include>
+ <include>org.antlr:antlr4-runtime</include>
+ <include>org.apache.kafka:*</include>
+
<include>mysql:mysql-connector-java</include>
+
<include>com.zendesk:mysql-binlog-connector-java</include>
+ <include>com.fasterxml.*:*</include>
+ <include>com.google.guava:*</include>
+
<include>com.esri.geometry:esri-geometry-api</include>
+ <include>com.zaxxer:HikariCP</include>
+ <!-- Include fixed version 18.0-13.0 of
flink shaded guava -->
+
<include>org.apache.flink:flink-shaded-guava</include>
+ <include>com.google.protobuf:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+
<artifact>org.apache.inlong:sort-connector-*</artifact>
+ <includes>
+ <include>org/apache/inlong/**</include>
+
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+ </includes>
+ </filter>
+ <filter>
+ <artifact>org.apache.kafka:*</artifact>
+ <excludes>
+
<exclude>kafka/kafka-version.properties</exclude>
+ <exclude>LICENSE</exclude>
+ <!-- Does not contain anything
relevant.
+ Cites a binary dependency on
jersey, but this is neither reflected in the
+ dependency graph, nor are any
jersey files bundled. -->
+ <exclude>NOTICE</exclude>
+ <exclude>common/**</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+
<pattern>org.apache.inlong.sort.base</pattern>
+
<shadedPattern>org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.base</shadedPattern>
+ </relocation>
+ <relocation>
+
<pattern>org.apache.inlong.sort.cdc.base</pattern>
+
<shadedPattern>org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.cdc.base</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka</pattern>
+
<shadedPattern>com.ververica.cdc.connectors.shaded.org.apache.kafka</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.antlr</pattern>
+
<shadedPattern>com.ververica.cdc.connectors.shaded.org.antlr</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.fasterxml</pattern>
+
<shadedPattern>com.ververica.cdc.connectors.shaded.com.fasterxml</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google</pattern>
+
<shadedPattern>com.ververica.cdc.connectors.shaded.com.google</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.esri.geometry</pattern>
+
<shadedPattern>com.ververica.cdc.connectors.shaded.com.esri.geometry</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.zaxxer</pattern>
+
<shadedPattern>com.ververica.cdc.connectors.shaded.com.zaxxer</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
new file mode 100644
index 0000000000..c684cc3ae4
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mysql;
+
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import com.ververica.cdc.connectors.mysql.source.config.ServerIdRange;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
+import com.ververica.cdc.connectors.mysql.table.MySqlTableSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static
com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
+import static
com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils.PROPERTIES_PREFIX;
+import static
com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils.getJdbcProperties;
+import static
com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
+import static
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static
com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
+import static org.apache.flink.util.Preconditions.checkState;
+import static
org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY;
+import static org.apache.inlong.sort.base.Constants.*;
+
+public class MysqlTableFactory implements DynamicTableSourceFactory {
+
+ private static final String IDENTIFIER = "mysql-cdc-inlong";
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validateExcept(
+ DEBEZIUM_OPTIONS_PREFIX, PROPERTIES_PREFIX);
+ final ReadableConfig config = helper.getOptions();
+ ResolvedSchema physicalSchema =
+
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
+ final String hostname = config.get(HOSTNAME);
+ final String username = config.get(USERNAME);
+ final String password = config.get(PASSWORD);
+ final String databaseName = config.get(DATABASE_NAME);
+ validateRegex(DATABASE_NAME.key(), databaseName);
+ final String tableName = config.get(TABLE_NAME);
+ validateRegex(TABLE_NAME.key(), tableName);
+ int port = config.get(PORT);
+ int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+ int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
+ int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
+ ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
+
+ String serverId = validateAndGetServerId(config);
+ StartupOptions startupOptions = getStartupOptions(config);
+ Duration connectTimeout = config.get(CONNECT_TIMEOUT);
+ int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
+ int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
+ double distributionFactorUpper =
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+ double distributionFactorLower =
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+ boolean scanNewlyAddedTableEnabled =
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+ Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
+ boolean enableParallelRead =
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+ final String chunkKeyColumn = config.get(CHUNK_KEY_COLUMN);
+ if (enableParallelRead) {
+ validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
splitSize, 1);
+ validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize,
1);
+ validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
+ validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
+ validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
+ validateDistributionFactorUpper(distributionFactorUpper);
+ validateDistributionFactorLower(distributionFactorLower);
+ }
+
+ return new MySqlTableSource(physicalSchema,
+ port,
+ hostname,
+ databaseName,
+ tableName,
+ username,
+ password,
+ serverTimeZone,
+ getDebeziumProperties(context.getCatalogTable().getOptions()),
+ serverId,
+ enableParallelRead,
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ startupOptions,
+ scanNewlyAddedTableEnabled,
+ getJdbcProperties(context.getCatalogTable().getOptions()),
+ heartbeatInterval,
+ chunkKeyColumn);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(HOSTNAME);
+ options.add(USERNAME);
+ options.add(PASSWORD);
+ options.add(DATABASE_NAME);
+ options.add(TABLE_NAME);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PORT);
+ options.add(SERVER_TIME_ZONE);
+ options.add(SERVER_ID);
+ options.add(SCAN_STARTUP_MODE);
+ options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+ options.add(CHUNK_META_GROUP_SIZE);
+ options.add(SCAN_SNAPSHOT_FETCH_SIZE);
+ options.add(CONNECT_TIMEOUT);
+ options.add(CONNECTION_POOL_SIZE);
+ options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+ options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+ options.add(CONNECT_MAX_RETRIES);
+ options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+ options.add(HEARTBEAT_INTERVAL);
+ options.add(INLONG_METRIC);
+ options.add(INLONG_AUDIT);
+ options.add(ROW_KINDS_FILTERED);
+ options.add(AUDIT_KEYS);
+ options.add(GH_OST_DDL_CHANGE);
+ options.add(GH_OST_TABLE_REGEX);
+ options.add(CHUNK_KEY_COLUMN);
+ return options;
+ }
+
+ public static final ConfigOption<String> CHUNK_KEY_COLUMN =
+ ConfigOptions.key("chunk-key-column")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The chunk key column");
+
+ public static final ConfigOption<String> HOSTNAME =
+ ConfigOptions.key("hostname")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("IP address or hostname of the MySQL
database server.");
+
+ public static final ConfigOption<Integer> PORT =
+ ConfigOptions.key("port")
+ .intType()
+ .defaultValue(3306)
+ .withDescription("Integer port number of the MySQL
database server.");
+
+ public static final ConfigOption<String> USERNAME =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the MySQL database to use when connecting
to the MySQL database server.");
+
+ public static final ConfigOption<String> PASSWORD =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Password to use when connecting to the MySQL
database server.");
+
+ public static final ConfigOption<String> DATABASE_NAME =
+ ConfigOptions.key("database-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Database name of the MySQL server to
monitor.");
+
+ public static final ConfigOption<String> TABLE_NAME =
+ ConfigOptions.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name of the MySQL database to
monitor.");
+
+ public static final ConfigOption<String> SERVER_TIME_ZONE =
+ ConfigOptions.key("server-time-zone")
+ .stringType()
+ .defaultValue("UTC")
+ .withDescription("The session time zone in database
server.");
+
+ public static final ConfigOption<String> SERVER_ID =
+ ConfigOptions.key("server-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "A numeric ID or a numeric ID range of this
database client, "
+ + "The numeric ID syntax is like '5400',
the numeric ID range syntax "
+ + "is like '5400-5408', The numeric ID
range syntax is recommended when "
+ + "'scan.incremental.snapshot.enabled'
enabled. Every ID must be unique across all "
+ + "currently-running database processes in
the MySQL cluster. This connector"
+ + " joins the MySQL cluster as another
server (with this unique ID) "
+ + "so it can read the binlog. By default,
a random number is generated between"
+ + " 5400 and 6400, though we recommend
setting an explicit value.");
+
+ public static final ConfigOption<Boolean>
SCAN_INCREMENTAL_SNAPSHOT_ENABLED =
+ ConfigOptions.key("scan.incremental.snapshot.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Incremental snapshot is a new mechanism to read
snapshot of a table. "
+ + "Compared to the old snapshot mechanism,
the incremental "
+ + "snapshot has many advantages,
including:\n"
+ + "(1) source can be parallel during
snapshot reading, \n"
+ + "(2) source can perform checkpoints in
the chunk "
+ + "granularity during snapshot reading, \n"
+ + "(3) source doesn't need to acquire
global read lock "
+ + "(FLUSH TABLES WITH READ LOCK) before
snapshot reading.\n"
+ + "If you would like the source run in
parallel, each parallel "
+ + "reader should have an unique server id,
"
+ + "so the 'server-id' must be a range like
'5400-6400', "
+ + "and the range must be larger than the
parallelism.");
+
+ public static final ConfigOption<Integer>
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+ ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+ .intType()
+ .defaultValue(8096)
+ .withDescription(
+ "The chunk size (number of rows) of table
snapshot, "
+ + "captured tables are split into multiple
"
+ + "chunks when read the snapshot of
table.");
+
+ public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+ ConfigOptions.key("scan.snapshot.fetch.size")
+ .intType()
+ .defaultValue(1024)
+ .withDescription(
+ "The maximum fetch size for per poll when read
table snapshot.");
+
+ public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+ ConfigOptions.key("connect.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription(
+ "The maximum time that the connector should wait
after "
+ + "trying to connect to the MySQL database
server before timing out.");
+
+ public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+ ConfigOptions.key("connection.pool.size")
+ .intType()
+ .defaultValue(20)
+ .withDescription("The connection pool size.");
+
+ public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+ ConfigOptions.key("connect.max-retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription(
+ "The max retry times that the connector should
retry to build "
+ + "MySQL database server connection.");
+
+ public static final ConfigOption<String> SCAN_STARTUP_MODE =
+ ConfigOptions.key("scan.startup.mode")
+ .stringType()
+ .defaultValue("initial")
+ .withDescription(
+ "Optional startup mode for MySQL CDC consumer,
valid "
+ + "enumerations are "
+ + "\"initial\", \"earliest-offset\", "
+ + "\"latest-offset\", \"timestamp\"\n"
+ + "or \"specific-offset\"");
+
+ public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
+ ConfigOptions.key("scan.startup.timestamp-millis")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional timestamp used in case of \"timestamp\"
startup mode");
+
+ public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
+ ConfigOptions.key("heartbeat.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription(
+ "Optional interval of sending heartbeat event for
tracing the "
+ + "latest available binlog offsets");
+
+ public static final ConfigOption<String> ROW_KINDS_FILTERED =
+ ConfigOptions.key("row-kinds-filtered")
+ .stringType()
+ .defaultValue("+I&-U&+U&-D")
+ .withDescription("row kinds to be filtered,"
+ + " here filtered means keep the data of certain
row kind"
+ + "the format follows rowKind1&rowKind2, supported
row kinds are "
+ + "\"+I\" represents INSERT.\n"
+ + "\"-U\" represents UPDATE_BEFORE.\n"
+ + "\"+U\" represents UPDATE_AFTER.\n"
+ + "\"-D\" represents DELETE.");
+
+ //
----------------------------------------------------------------------------
+ // experimental options, won't add them to documentation
+ //
----------------------------------------------------------------------------
+ @Experimental
+ public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE =
+ ConfigOptions.key("chunk-meta.group.size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription(
+ "The group size of chunk meta, if the meta size
exceeds the "
+ + "group size, the meta will be will be
divided into multiple groups.");
+
+ @Experimental
+ public static final ConfigOption<Double>
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+ ConfigOptions.key("split-key.even-distribution.factor.upper-bound")
+ .doubleType()
+ .defaultValue(1000.0d)
+ .withDescription(
+ "The upper bound of split key distribution factor.
The distribution "
+ + "factor is used to determine whether the"
+ + " table is evenly distribution or not."
+ + " The table chunks would use evenly
calculation optimization "
+ + "when the data distribution is even,"
+ + " and the query MySQL for splitting
would happen when it is uneven."
+ + " The distribution factor could be
calculated by (MAX(id) - "
+ + "MIN(id) + 1) / rowCount.");
+
+ @Experimental
+ public static final ConfigOption<Double>
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
+ ConfigOptions.key("split-key.even-distribution.factor.lower-bound")
+ .doubleType()
+ .defaultValue(0.05d)
+ .withDescription(
+ "The lower bound of split key distribution factor.
The distribution "
+ + "factor is used to determine whether the"
+ + " table is evenly distribution or not."
+ + " The table chunks would use evenly
calculation optimization "
+ + "when the data distribution is even,"
+ + " and the query MySQL for splitting
would happen when it is uneven."
+ + " The distribution factor could be
calculated by (MAX(id) - "
+ + "MIN(id) + 1) / rowCount.");
+
+ @Experimental
+ public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
+ ConfigOptions.key("scan.newly-added-table.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether capture the scan the newly added tables
or not, by default is false.");
+
+ private void validateRegex(String optionName, String regex) {
+ try {
+ Pattern.compile(regex);
+ } catch (Exception e) {
+ throw new ValidationException(
+ String.format(
+ "The %s '%s' is not a valid regular expression",
optionName, regex),
+ e);
+ }
+ }
+
+ private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
+ private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST =
"earliest-offset";
+ private static final String SCAN_STARTUP_MODE_VALUE_LATEST =
"latest-offset";
+ private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET =
"specific-offset";
+ private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP =
"timestamp";
+
+ private static StartupOptions getStartupOptions(ReadableConfig config) {
+ String modeString = config.get(SCAN_STARTUP_MODE);
+
+ switch (modeString.toLowerCase()) {
+ case SCAN_STARTUP_MODE_VALUE_INITIAL:
+ return StartupOptions.initial();
+
+ case SCAN_STARTUP_MODE_VALUE_LATEST:
+ return StartupOptions.latest();
+
+ case SCAN_STARTUP_MODE_VALUE_EARLIEST:
+ return StartupOptions.earliest();
+
+ case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET:
+ validateSpecificOffset(config);
+ return getSpecificOffset(config);
+
+ case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
+ return
StartupOptions.timestamp(config.get(SCAN_STARTUP_TIMESTAMP_MILLIS));
+
+ default:
+ throw new ValidationException(
+ String.format(
+ "Invalid value for option '%s'. Supported
values are [%s, %s], but was: %s",
+ SCAN_STARTUP_MODE.key(),
+ SCAN_STARTUP_MODE_VALUE_INITIAL,
+ SCAN_STARTUP_MODE_VALUE_LATEST,
+ modeString));
+ }
+ }
+
+ private static void validateSpecificOffset(ReadableConfig config) {
+ Optional<String> gtidSet = config.getOptional(
+ MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
+ Optional<String> binlogFilename = config.getOptional(
+ MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+ Optional<Long> binlogPosition = config.getOptional(
+ MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+ if (!gtidSet.isPresent() && !(binlogFilename.isPresent() &&
binlogPosition.isPresent())) {
+ throw new ValidationException(
+ String.format(
+ "Unable to find a valid binlog offset. Either %s,
or %s and %s are required.",
+
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET.key(),
+
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE.key(),
+
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS.key()));
+ }
+ }
+
+ private static StartupOptions getSpecificOffset(ReadableConfig config) {
+ BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
+
+ // GTID set
+ config.getOptional(
+ MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
+ .ifPresent(offsetBuilder::setGtidSet);
+
+ // Binlog file + pos
+ Optional<String> binlogFilename =
config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+ Optional<Long> binlogPosition =
config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+ if (binlogFilename.isPresent() && binlogPosition.isPresent()) {
+ offsetBuilder.setBinlogFilePosition(binlogFilename.get(),
binlogPosition.get());
+ } else {
+ offsetBuilder.setBinlogFilePosition("", 0);
+ }
+
+ config.getOptional(
+ MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
+ .ifPresent(offsetBuilder::setSkipEvents);
+ config.getOptional(
+ MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
+ .ifPresent(offsetBuilder::setSkipRows);
+ return StartupOptions.specificOffset(offsetBuilder.build());
+ }
+
+ private void validateDistributionFactorLower(double
distributionFactorLower) {
+ checkState(
+ doubleCompare(distributionFactorLower, 0.0d) >= 0
+ && doubleCompare(distributionFactorLower, 1.0d) <= 0,
+ String.format(
+ "The value of option '%s' must between %s and %s
inclusively, but is %s",
+ SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(),
+ 0.0d,
+ 1.0d,
+ distributionFactorLower));
+ }
+
+ private void validateDistributionFactorUpper(double
distributionFactorUpper) {
+ checkState(
+ doubleCompare(distributionFactorUpper, 1.0d) >= 0,
+ String.format(
+ "The value of option '%s' must larger than or equals
%s, but is %s",
+ SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(),
+ 1.0d,
+ distributionFactorUpper));
+ }
+
+ private void validateIntegerOption(
+ ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
+ checkState(
+ optionValue > exclusiveMin,
+ String.format(
+ "The value of option '%s' must larger than %d, but is
%d",
+ option.key(), exclusiveMin, optionValue));
+ }
+
+ private String validateAndGetServerId(ReadableConfig configuration) {
+ final String serverIdValue =
configuration.get(MySqlSourceOptions.SERVER_ID);
+ if (serverIdValue != null) {
+ // validation
+ try {
+ ServerIdRange.from(serverIdValue);
+ } catch (Exception e) {
+ throw new ValidationException(
+ String.format(
+ "The value of option 'server-id' is invalid:
'%s'", serverIdValue),
+ e);
+ }
+ }
+ return serverIdValue;
+ }
+
+ public static final ConfigOption<String> INLONG_METRIC =
+ ConfigOptions.key("inlong.metric.labels")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("INLONG metric labels, format is
'key1=value1&key2=value2',"
+ + "default is
'groupId=xxx&streamId=xxx&nodeId=xxx'");
+
+ public static final ConfigOption<String> INLONG_AUDIT =
+ ConfigOptions.key(METRICS_AUDIT_PROXY_HOSTS_KEY)
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Audit proxy host address for reporting
audit metrics. \n"
+ + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
+
+ public static final ConfigOption<String> AUDIT_KEYS =
+ ConfigOptions.key("metrics.audit.key")
+ .stringType()
+ .defaultValue("")
+ .withDescription("Audit keys for metrics collecting");
+
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..da4dc53894
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.mysql.MysqlTableFactory
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
index 6756f785d0..73a81bf262 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
@@ -34,6 +34,7 @@
<modules>
<module>postgres-cdc</module>
<module>starrocks</module>
+ <module>mysql-cdc</module>
<module>iceberg</module>
</modules>
diff --git
a/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
b/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
index 456b556002..5c3bab289a 100644
---
a/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
+++
b/inlong-sort/sort-formats/format-json-v1.15/src/main/java/org/apache/inlong/sort/formats/json/utils/FormatJsonUtil.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import java.util.Map;
+
import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_PRECISION;
import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.DEFAULT_DECIMAL_SCALE;
import static
org.apache.inlong.sort.protocol.constant.DataTypeConstants.ORACLE_TIMESTAMP_TIME_ZONE;