This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 6767d85d9 [FLINK-38279][mysql] Add SnapshotPendingSplitsState to MySQL 
checkpoint state during restoration
6767d85d9 is described below

commit 6767d85d93e0f38299536b020da9f3053d90acf1
Author: Dustin Washington <[email protected]>
AuthorDate: Fri Nov 14 06:57:10 2025 -0500

    [FLINK-38279][mysql] Add SnapshotPendingSplitsState to MySQL checkpoint 
state during restoration
    
    This closes #4127.
    
    
    Co-authored-by: yuxiqian <[email protected]>
---
 .../cdc/connectors/mysql/source/MySqlSource.java   |  8 +++
 .../tests/migration/YamlJobMigrationITCase.java    | 75 ++++++++++++++++++++++
 .../udf/examples/java/ThrottlerFunctionClass.java  | 32 +++++++++
 3 files changed, 115 insertions(+)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
index cb06cc45a..9cd40f015 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.BinlogPendin
 import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
 import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
 import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer;
+import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
 import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
 import 
org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
@@ -251,6 +252,13 @@ public class MySqlSource<T>
                             enumContext.currentParallelism(),
                             (HybridPendingSplitsState) checkpoint,
                             enumContext);
+        } else if (checkpoint instanceof SnapshotPendingSplitsState) {
+            splitAssigner =
+                    new MySqlSnapshotSplitAssigner(
+                            sourceConfig,
+                            enumContext.currentParallelism(),
+                            (SnapshotPendingSplitsState) checkpoint,
+                            enumContext);
         } else if (checkpoint instanceof BinlogPendingSplitsState) {
             splitAssigner =
                     new MySqlBinlogSplitAssigner(
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
index 2a875d9f7..82a0a9ff3 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java
@@ -18,6 +18,8 @@
 package org.apache.flink.cdc.pipeline.tests.migration;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
 import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
 import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
 import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
@@ -32,6 +34,7 @@ import org.junit.jupiter.params.provider.EnumSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.file.Path;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -276,6 +279,78 @@ class YamlJobMigrationITCase extends 
PipelineTestEnvironment {
         cancelJob(newJobID);
     }
 
+    @ParameterizedTest(name = "{0} -> SNAPSHOT")
+    @EnumSource(names = {"SNAPSHOT"})
+    void 
testRestartingJobFromSavepointInSnapshotMode(TarballFetcher.CdcVersion 
migrateFromVersion)
+            throws Exception {
+        TarballFetcher.fetch(jobManager, migrateFromVersion);
+        runInContainerAsRoot(jobManager, "chmod", "0777", "-R", "/tmp/cdc/");
+        Path udfJar = TestUtils.getResource("udf-examples.jar");
+        LOG.info("Successfully fetched CDC {}.", migrateFromVersion);
+        String content =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: %d\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.\\.*\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "  scan.startup.mode: snapshot\n"
+                                + "  scan.incremental.snapshot.chunk.size: 2\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: values\n"
+                                + "\n"
+                                + "transform:\n"
+                                + "  - source-table: %s.\\.*\n"
+                                + "    projection: \\*, throttle(id, 10) AS 
throttled\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  parallelism: %d\n"
+                                + "  user-defined-function:\n"
+                                + "    - name: throttle\n"
+                                + "      classpath: 
org.apache.flink.cdc.udf.examples.java.ThrottlerFunctionClass\n",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MySqlContainer.MYSQL_PORT,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        mysqlInventoryDatabase.getDatabaseName(),
+                        mysqlInventoryDatabase.getDatabaseName(),
+                        parallelism);
+        JobID jobID = submitPipelineJob(migrateFromVersion, content, udfJar);
+        Assertions.assertThat(jobID).isNotNull();
+        LOG.info("Submitted Job ID is {} ", jobID);
+        waitUntilJobState(Duration.ofSeconds(30), JobStatus.RUNNING);
+
+        String savepointPath = stopJobWithSavepoint(jobID);
+        LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, 
savepointPath);
+
+        JobID newJobID = submitPipelineJob(content, savepointPath, true, 
udfJar);
+        LOG.info("Reincarnated Job {} has been submitted successfully.", 
newJobID);
+        validateResult(
+                dbNameFormatter,
+                "CreateTableEvent{tableId=%s.customers, schema=columns={`id` 
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` 
VARCHAR(1024),`phone_number` VARCHAR(512),`throttled` STRING}, primaryKeys=id, 
options=()}",
+                "CreateTableEvent{tableId=%s.products, schema=columns={`id` 
INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` 
VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` 
STRING,`throttled` STRING}, primaryKeys=id, options=()}",
+                "DataChangeEvent{tableId=%s.customers, before=[], after=[101, 
user_1, Shanghai, 123567891234, throttled_101], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.customers, before=[], after=[102, 
user_2, Shanghai, 123567891234, throttled_102], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.customers, before=[], after=[103, 
user_3, Shanghai, 123567891234, throttled_103], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.customers, before=[], after=[104, 
user_4, Shanghai, 123567891234, throttled_104], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[101, 
scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, 
{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, throttled_101], 
op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[102, 
car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, 
{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, throttled_102], 
op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[103, 
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 
0.8, red, {\"key3\": \"value3\"}, 
{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, throttled_103], 
op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[104, 
hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, 
{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, throttled_104], 
op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[105, 
hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, 
{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, throttled_105], 
op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[106, 
hammer, 16oz carpenter's hammer, 1.0, null, null, null, throttled_106], 
op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[107, 
rocks, box of assorted rocks, 5.3, null, null, null, throttled_107], op=INSERT, 
meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[108, 
jacket, water resistent black wind breaker, 0.1, null, null, null, 
throttled_108], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[109, 
spare tire, 24 inch spare tire, 22.2, null, null, null, throttled_109], 
op=INSERT, meta=()}");
+
+        LOG.info("Snapshot stage finished successfully.");
+    }
+
     private void generateIncrementalEventsPhaseOne() {
         executeMySqlStatements(
                 mysqlInventoryDatabase,
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ThrottlerFunctionClass.java
 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ThrottlerFunctionClass.java
new file mode 100644
index 000000000..aef1ab95e
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/ThrottlerFunctionClass.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.java;
+
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+/** This is an example UDF class for slowing down the pipeline deliberately. */
+public class ThrottlerFunctionClass implements UserDefinedFunction {
+    public String eval(Object o, int seconds) {
+        try {
+            Thread.sleep(seconds * 1000L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        return "throttled_" + o;
+    }
+}

Reply via email to