This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 6767d85d9 [FLINK-38279][mysql] Add SnapshotPendingSplitsState to MySQL
checkpoint state during restoration
6767d85d9 is described below
commit 6767d85d93e0f38299536b020da9f3053d90acf1
Author: Dustin Washington <[email protected]>
AuthorDate: Fri Nov 14 06:57:10 2025 -0500
[FLINK-38279][mysql] Add SnapshotPendingSplitsState to MySQL checkpoint
state during restoration
This closes #4127.
Co-authored-by: yuxiqian <[email protected]>
---
.../cdc/connectors/mysql/source/MySqlSource.java | 8 +++
.../tests/migration/YamlJobMigrationITCase.java | 75 ++++++++++++++++++++++
.../udf/examples/java/ThrottlerFunctionClass.java | 32 +++++++++
3 files changed, 115 insertions(+)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
index cb06cc45a..9cd40f015 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
@@ -38,6 +38,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.assigners.state.BinlogPendin
import
org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
import
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer;
+import
org.apache.flink.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import
org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
@@ -251,6 +252,13 @@ public class MySqlSource<T>
enumContext.currentParallelism(),
(HybridPendingSplitsState) checkpoint,
enumContext);
+ } else if (checkpoint instanceof SnapshotPendingSplitsState) {
+ splitAssigner =
+ new MySqlSnapshotSplitAssigner(
+ sourceConfig,
+ enumContext.currentParallelism(),
+ (SnapshotPendingSplitsState) checkpoint,
+ enumContext);
} else if (checkpoint instanceof BinlogPendingSplitsState) {
splitAssigner =
new MySqlBinlogSplitAssigner(
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
index 2a875d9f7..82a0a9ff3 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
@@ -18,6 +18,8 @@
package org.apache.flink.cdc.pipeline.tests.migration;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
@@ -32,6 +34,7 @@ import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
@@ -276,6 +279,78 @@ class YamlJobMigrationITCase extends
PipelineTestEnvironment {
cancelJob(newJobID);
}
+ @ParameterizedTest(name = "{0} -> SNAPSHOT")
+ @EnumSource(names = {"SNAPSHOT"})
+ void
testRestartingJobFromSavepointInSnapshotMode(TarballFetcher.CdcVersion
migrateFromVersion)
+ throws Exception {
+ TarballFetcher.fetch(jobManager, migrateFromVersion);
+ runInContainerAsRoot(jobManager, "chmod", "0777", "-R", "/tmp/cdc/");
+ Path udfJar = TestUtils.getResource("udf-examples.jar");
+ LOG.info("Successfully fetched CDC {}.", migrateFromVersion);
+ String content =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: %d\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + " scan.startup.mode: snapshot\n"
+ + " scan.incremental.snapshot.chunk.size: 2\n"
+ + "\n"
+ + "sink:\n"
+ + " type: values\n"
+ + "\n"
+ + "transform:\n"
+ + " - source-table: %s.\\.*\n"
+ + " projection: \\*, throttle(id, 10) AS
throttled\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: %d\n"
+ + " user-defined-function:\n"
+ + " - name: throttle\n"
+ + " classpath:
org.apache.flink.cdc.udf.examples.java.ThrottlerFunctionClass\n",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MySqlContainer.MYSQL_PORT,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ mysqlInventoryDatabase.getDatabaseName(),
+ mysqlInventoryDatabase.getDatabaseName(),
+ parallelism);
+ JobID jobID = submitPipelineJob(migrateFromVersion, content, udfJar);
+ Assertions.assertThat(jobID).isNotNull();
+ LOG.info("Submitted Job ID is {} ", jobID);
+ waitUntilJobState(Duration.ofSeconds(30), JobStatus.RUNNING);
+
+ String savepointPath = stopJobWithSavepoint(jobID);
+ LOG.info("Stopped Job {} and created a savepoint at {}.", jobID,
savepointPath);
+
+ JobID newJobID = submitPipelineJob(content, savepointPath, true,
udfJar);
+ LOG.info("Reincarnated Job {} has been submitted successfully.",
newJobID);
+ validateResult(
+ dbNameFormatter,
+ "CreateTableEvent{tableId=%s.customers, schema=columns={`id`
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address`
VARCHAR(1024),`phone_number` VARCHAR(512),`throttled` STRING}, primaryKeys=id,
options=()}",
+ "CreateTableEvent{tableId=%s.products, schema=columns={`id`
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description`
VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c`
STRING,`throttled` STRING}, primaryKeys=id, options=()}",
+ "DataChangeEvent{tableId=%s.customers, before=[], after=[101,
user_1, Shanghai, 123567891234, throttled_101], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.customers, before=[], after=[102,
user_2, Shanghai, 123567891234, throttled_102], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.customers, before=[], after=[103,
user_3, Shanghai, 123567891234, throttled_103], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.customers, before=[], after=[104,
user_4, Shanghai, 123567891234, throttled_104], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[101,
scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"},
{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, throttled_101],
op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[102,
car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"},
{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, throttled_102],
op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8, red, {\"key3\": \"value3\"},
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, throttled_103],
op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[104,
hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"},
{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, throttled_104],
op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[105,
hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"},
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, throttled_105],
op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[106,
hammer, 16oz carpenter's hammer, 1.0, null, null, null, throttled_106],
op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[107,
rocks, box of assorted rocks, 5.3, null, null, null, throttled_107], op=INSERT,
meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[108,
jacket, water resistent black wind breaker, 0.1, null, null, null,
throttled_108], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[109,
spare tire, 24 inch spare tire, 22.2, null, null, null, throttled_109],
op=INSERT, meta=()}");
+
+ LOG.info("Snapshot stage finished successfully.");
+ }
+
private void generateIncrementalEventsPhaseOne() {
executeMySqlStatements(
mysqlInventoryDatabase,
diff --git
a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ThrottlerFunctionClass.java
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ThrottlerFunctionClass.java
new file mode 100644
index 000000000..aef1ab95e
--- /dev/null
+++
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ThrottlerFunctionClass.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.java;
+
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+/** This is an example UDF class for slowing down the pipeline deliberately. */
+public class ThrottlerFunctionClass implements UserDefinedFunction {
+ public String eval(Object o, int seconds) {
+ try {
+ Thread.sleep(seconds * 1000L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return "throttled_" + o;
+ }
+}