This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 085684b77 [FLINK-36811][mysql] MySQL cdc setIsProcessingBacklog in 
snapshot phase and exit when snapshot phase finished (#3793)
085684b77 is described below

commit 085684b773c9d8549a70b7f3060ea07ee86cabd2
Author: Shawn Huang <hx0...@gmail.com>
AuthorDate: Mon Jan 13 14:56:05 2025 +0800

    [FLINK-36811][mysql] MySQL cdc setIsProcessingBacklog in snapshot phase and 
exit when snapshot phase finished (#3793)
---
 .../cdc/connectors/mysql/source/MySqlSource.java   |  6 +-
 .../source/assigners/MySqlHybridSplitAssigner.java | 18 ++++--
 .../assigners/MySqlSnapshotSplitAssigner.java      | 50 ++++++++++------
 .../debezium/reader/BinlogSplitReaderTest.java     |  7 ++-
 .../debezium/reader/SnapshotSplitReaderTest.java   |  7 ++-
 .../assigners/MySqlHybridSplitAssignerTest.java    | 66 +++++++++++++++++++++-
 .../assigners/MySqlSnapshotSplitAssignerTest.java  | 19 ++++++-
 .../mysql/source/reader/MySqlSourceReaderTest.java |  7 ++-
 .../MockMySqlSplitEnumeratorEnumeratorContext.java | 40 +++++++++++++
 .../connectors/mysql/testutils/MetricsUtils.java   | 27 +++++++++
 10 files changed, 216 insertions(+), 31 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
index 47c3af92b..421e1b7a9 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
@@ -208,7 +208,8 @@ public class MySqlSource<T>
                                 sourceConfig,
                                 enumContext.currentParallelism(),
                                 new ArrayList<>(),
-                                isTableIdCaseSensitive);
+                                isTableIdCaseSensitive,
+                                enumContext);
             } catch (Exception e) {
                 throw new FlinkRuntimeException(
                         "Failed to discover captured tables for enumerator", 
e);
@@ -233,7 +234,8 @@ public class MySqlSource<T>
                     new MySqlHybridSplitAssigner(
                             sourceConfig,
                             enumContext.currentParallelism(),
-                            (HybridPendingSplitsState) checkpoint);
+                            (HybridPendingSplitsState) checkpoint,
+                            enumContext);
         } else if (checkpoint instanceof BinlogPendingSplitsState) {
             splitAssigner =
                     new MySqlBinlogSplitAssigner(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java
index fb6546879..d66e6f3da 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.cdc.connectors.mysql.source.assigners;
 
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
 import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
 import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
@@ -59,11 +60,16 @@ public class MySqlHybridSplitAssigner implements 
MySqlSplitAssigner {
             MySqlSourceConfig sourceConfig,
             int currentParallelism,
             List<TableId> remainingTables,
-            boolean isTableIdCaseSensitive) {
+            boolean isTableIdCaseSensitive,
+            SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
         this(
                 sourceConfig,
                 new MySqlSnapshotSplitAssigner(
-                        sourceConfig, currentParallelism, remainingTables, 
isTableIdCaseSensitive),
+                        sourceConfig,
+                        currentParallelism,
+                        remainingTables,
+                        isTableIdCaseSensitive,
+                        enumeratorContext),
                 false,
                 sourceConfig.getSplitMetaGroupSize());
     }
@@ -71,11 +77,15 @@ public class MySqlHybridSplitAssigner implements 
MySqlSplitAssigner {
     public MySqlHybridSplitAssigner(
             MySqlSourceConfig sourceConfig,
             int currentParallelism,
-            HybridPendingSplitsState checkpoint) {
+            HybridPendingSplitsState checkpoint,
+            SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
         this(
                 sourceConfig,
                 new MySqlSnapshotSplitAssigner(
-                        sourceConfig, currentParallelism, 
checkpoint.getSnapshotPendingSplits()),
+                        sourceConfig,
+                        currentParallelism,
+                        checkpoint.getSnapshotPendingSplits(),
+                        enumeratorContext),
                 checkpoint.isBinlogSplitAssigned(),
                 sourceConfig.getSplitMetaGroupSize());
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index 0382cf6e7..f65d96c60 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.cdc.connectors.mysql.source.assigners;
 
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
 import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema;
 import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState;
@@ -79,6 +80,7 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
     private final int currentParallelism;
     private final List<TableId> remainingTables;
     private final boolean isRemainingTablesCheckpointed;
+    private final SplitEnumeratorContext<MySqlSplit> enumeratorContext;
 
     private final MySqlPartition partition;
     private final Object lock = new Object();
@@ -95,7 +97,8 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
             MySqlSourceConfig sourceConfig,
             int currentParallelism,
             List<TableId> remainingTables,
-            boolean isTableIdCaseSensitive) {
+            boolean isTableIdCaseSensitive,
+            SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
         this(
                 sourceConfig,
                 currentParallelism,
@@ -108,13 +111,15 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
                 remainingTables,
                 isTableIdCaseSensitive,
                 true,
-                ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
+                ChunkSplitterState.NO_SPLITTING_TABLE_STATE,
+                enumeratorContext);
     }
 
     public MySqlSnapshotSplitAssigner(
             MySqlSourceConfig sourceConfig,
             int currentParallelism,
-            SnapshotPendingSplitsState checkpoint) {
+            SnapshotPendingSplitsState checkpoint,
+            SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
         this(
                 sourceConfig,
                 currentParallelism,
@@ -127,7 +132,8 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
                 checkpoint.getRemainingTables(),
                 checkpoint.isTableIdCaseSensitive(),
                 checkpoint.isRemainingTablesCheckpointed(),
-                checkpoint.getChunkSplitterState());
+                checkpoint.getChunkSplitterState(),
+                enumeratorContext);
     }
 
     private MySqlSnapshotSplitAssigner(
@@ -142,7 +148,8 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
             List<TableId> remainingTables,
             boolean isTableIdCaseSensitive,
             boolean isRemainingTablesCheckpointed,
-            ChunkSplitterState chunkSplitterState) {
+            ChunkSplitterState chunkSplitterState,
+            SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
         this.sourceConfig = sourceConfig;
         this.currentParallelism = currentParallelism;
         this.alreadyProcessedTables = alreadyProcessedTables;
@@ -168,10 +175,12 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
                 createChunkSplitter(sourceConfig, isTableIdCaseSensitive, 
chunkSplitterState);
         this.partition =
                 new 
MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
+        this.enumeratorContext = enumeratorContext;
     }
 
     @Override
     public void open() {
+        shouldEnterProcessingBacklog();
         chunkSplitter.open();
         discoveryCaptureTables();
         captureNewlyAddedTables();
@@ -397,17 +406,20 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
     @Override
     public void onFinishedSplits(Map<String, BinlogOffset> 
splitFinishedOffsets) {
         this.splitFinishedOffsets.putAll(splitFinishedOffsets);
-        if (allSnapshotSplitsFinished()
-                && AssignerStatus.isAssigningSnapshotSplits(assignerStatus)) {
-            // Skip the waiting checkpoint when current parallelism is 1 which 
means we do not need
-            // to care about the global output data order of snapshot splits 
and binlog split.
-            if (currentParallelism == 1) {
-                assignerStatus = assignerStatus.onFinish();
-                LOG.info(
-                        "Snapshot split assigner received all splits finished 
and the job parallelism is 1, snapshot split assigner is turn into finished 
status.");
-            } else {
-                LOG.info(
-                        "Snapshot split assigner received all splits finished, 
waiting for a complete checkpoint to mark the assigner finished.");
+        if (allSnapshotSplitsFinished()) {
+            enumeratorContext.setIsProcessingBacklog(false);
+            if (AssignerStatus.isAssigningSnapshotSplits(assignerStatus)) {
+                // Skip the waiting checkpoint when current parallelism is 1 
which means we do not
+                // need
+                // to care about the global output data order of snapshot 
splits and binlog split.
+                if (currentParallelism == 1) {
+                    assignerStatus = assignerStatus.onFinish();
+                    LOG.info(
+                            "Snapshot split assigner received all splits 
finished and the job parallelism is 1, snapshot split assigner is turn into 
finished status.");
+                } else {
+                    LOG.info(
+                            "Snapshot split assigner received all splits 
finished, waiting for a complete checkpoint to mark the assigner finished.");
+                }
             }
         }
     }
@@ -607,4 +619,10 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
         }
         return new MySqlChunkSplitter(mySqlSchema, sourceConfig);
     }
+
+    private void shouldEnterProcessingBacklog() {
+        if (assignerStatus == AssignerStatus.INITIAL_ASSIGNING) {
+            enumeratorContext.setIsProcessingBacklog(true);
+        }
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
index f9c070d96..17ade8f0f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
@@ -86,6 +86,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.MySqlTestUtils.assertContain
 import static 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset;
 import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
 import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -1182,7 +1183,11 @@ public class BinlogSplitReaderTest extends 
MySqlSourceTestBase {
 
         final MySqlSnapshotSplitAssigner assigner =
                 new MySqlSnapshotSplitAssigner(
-                        sourceConfig, DEFAULT_PARALLELISM, remainingTables, 
false);
+                        sourceConfig,
+                        DEFAULT_PARALLELISM,
+                        remainingTables,
+                        false,
+                        getMySqlSplitEnumeratorContext());
         assigner.open();
         List<MySqlSnapshotSplit> mySqlSplits = new ArrayList<>();
         while (true) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
index 147fe441a..2eb7f3b34 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
@@ -50,6 +50,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -629,7 +630,11 @@ public class SnapshotSplitReaderTest extends 
MySqlSourceTestBase {
             MySqlSourceConfig sourceConfig, List<TableId> remainingTables) {
         final MySqlSnapshotSplitAssigner assigner =
                 new MySqlSnapshotSplitAssigner(
-                        sourceConfig, DEFAULT_PARALLELISM, remainingTables, 
false);
+                        sourceConfig,
+                        DEFAULT_PARALLELISM,
+                        remainingTables,
+                        false,
+                        getMySqlSplitEnumeratorContext());
         assigner.open();
         List<MySqlSplit> mySqlSplitList = new ArrayList<>();
         while (true) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java
index 134f426be..3521a3506 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
 import 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
+import 
org.apache.flink.cdc.connectors.mysql.source.utils.MockMySqlSplitEnumeratorEnumeratorContext;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
 import org.apache.flink.table.api.DataTypes;
@@ -50,6 +51,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -109,8 +112,11 @@ public class MySqlHybridSplitAssignerTest extends 
MySqlSourceTestBase {
                         ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
         HybridPendingSplitsState checkpoint =
                 new HybridPendingSplitsState(snapshotPendingSplitsState, 
false);
+        MockMySqlSplitEnumeratorEnumeratorContext enumeratorContext =
+                getMySqlSplitEnumeratorContext();
         final MySqlHybridSplitAssigner assigner =
-                new MySqlHybridSplitAssigner(configuration, 
DEFAULT_PARALLELISM, checkpoint);
+                new MySqlHybridSplitAssigner(
+                        configuration, DEFAULT_PARALLELISM, checkpoint, 
enumeratorContext);
 
         // step 2. Get the MySqlBinlogSplit after all snapshot splits finished
         Optional<MySqlSplit> binlogSplit = assigner.getNext();
@@ -152,7 +158,12 @@ public class MySqlHybridSplitAssignerTest extends 
MySqlSourceTestBase {
 
         // Create and initialize assigner
         MySqlHybridSplitAssigner assigner =
-                new MySqlHybridSplitAssigner(sourceConfig, 1, new 
ArrayList<>(), false);
+                new MySqlHybridSplitAssigner(
+                        sourceConfig,
+                        1,
+                        new ArrayList<>(),
+                        false,
+                        getMySqlSplitEnumeratorContext());
         assigner.open();
 
         // Get all snapshot splits
@@ -201,6 +212,57 @@ public class MySqlHybridSplitAssignerTest extends 
MySqlSourceTestBase {
                 .createConfig(0);
     }
 
+    @Test
+    public void testSetProcessingBacklog() {
+        final String captureTable = "customers";
+        MySqlSourceConfig configuration = getConfig(new String[] 
{captureTable});
+        MockMySqlSplitEnumeratorEnumeratorContext enumeratorContext =
+                getMySqlSplitEnumeratorContext();
+        final MySqlHybridSplitAssigner assigner =
+                new MySqlHybridSplitAssigner(
+                        configuration,
+                        DEFAULT_PARALLELISM,
+                        new ArrayList<>(),
+                        false,
+                        enumeratorContext);
+        assertThat(enumeratorContext.isProcessingBacklog()).isFalse();
+        assigner.open();
+        assertThat(enumeratorContext.isProcessingBacklog()).isTrue();
+        // Get all snapshot splits
+        List<MySqlSnapshotSplit> snapshotSplits = 
drainSnapshotSplits(assigner);
+        Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
+        int i = 0;
+        for (MySqlSnapshotSplit snapshotSplit : snapshotSplits) {
+            BinlogOffset binlogOffset =
+                    BinlogOffset.builder().setBinlogFilePosition("foo", 
i++).build();
+            finishedOffsets.put(snapshotSplit.splitId(), binlogOffset);
+        }
+        assigner.onFinishedSplits(finishedOffsets);
+        assertThat(enumeratorContext.isProcessingBacklog()).isFalse();
+        assigner.close();
+    }
+
+    private MySqlSourceConfigFactory getConfigFactory(String[] captureTables) {
+        String[] captureTableIds =
+                Arrays.stream(captureTables)
+                        .map(tableName -> customerDatabase.getDatabaseName() + 
"." + tableName)
+                        .toArray(String[]::new);
+
+        return new MySqlSourceConfigFactory()
+                .startupOptions(StartupOptions.initial())
+                .databaseList(customerDatabase.getDatabaseName())
+                .tableList(captureTableIds)
+                .hostname(MYSQL_CONTAINER.getHost())
+                .port(MYSQL_CONTAINER.getDatabasePort())
+                .username(customerDatabase.getUsername())
+                .password(customerDatabase.getPassword())
+                .serverTimeZone(ZoneId.of("UTC").toString());
+    }
+
+    private MySqlSourceConfig getConfig(String[] captureTables) {
+        return getConfigFactory(captureTables).createConfig(0);
+    }
+
     private List<MySqlSnapshotSplit> 
drainSnapshotSplits(MySqlHybridSplitAssigner assigner) {
         List<MySqlSnapshotSplit> snapshotSplits = new ArrayList<>();
         while (true) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index de875a0ed..7f16cab3f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -50,6 +50,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
 import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
 import static 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.ofEarliest;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -475,7 +476,11 @@ public class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
 
         final MySqlSnapshotSplitAssigner assigner =
                 new MySqlSnapshotSplitAssigner(
-                        configuration, DEFAULT_PARALLELISM, new ArrayList<>(), 
false);
+                        configuration,
+                        DEFAULT_PARALLELISM,
+                        new ArrayList<>(),
+                        false,
+                        getMySqlSplitEnumeratorContext());
 
         assertTrue(assigner.needToDiscoveryTables());
         assigner.open();
@@ -549,7 +554,11 @@ public class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         .collect(Collectors.toList());
         final MySqlSnapshotSplitAssigner assigner =
                 new MySqlSnapshotSplitAssigner(
-                        configuration, DEFAULT_PARALLELISM, remainingTables, 
false);
+                        configuration,
+                        DEFAULT_PARALLELISM,
+                        remainingTables,
+                        false,
+                        getMySqlSplitEnumeratorContext());
         return getSplitsFromAssigner(assigner);
     }
 
@@ -642,7 +651,11 @@ public class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         true,
                         ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
         final MySqlSnapshotSplitAssigner assigner =
-                new MySqlSnapshotSplitAssigner(configuration, 
DEFAULT_PARALLELISM, checkpoint);
+                new MySqlSnapshotSplitAssigner(
+                        configuration,
+                        DEFAULT_PARALLELISM,
+                        checkpoint,
+                        getMySqlSplitEnumeratorContext());
         return getSplitsFromAssigner(assigner);
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
index 2f674b612..9dff0446c 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
@@ -99,6 +99,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isH
 import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent;
 import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
 import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
 import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
@@ -404,7 +405,8 @@ public class MySqlSourceReaderTest extends 
MySqlSourceTestBase {
                         sourceConfig,
                         DEFAULT_PARALLELISM,
                         
tableNames.stream().map(TableId::parse).collect(Collectors.toList()),
-                        false);
+                        false,
+                        getMySqlSplitEnumeratorContext());
         assigner.open();
         List<MySqlSplit> splits = new ArrayList<>();
         MySqlSnapshotSplit split = (MySqlSnapshotSplit) 
assigner.getNext().get();
@@ -459,7 +461,8 @@ public class MySqlSourceReaderTest extends 
MySqlSourceTestBase {
                         sourceConfig,
                         DEFAULT_PARALLELISM,
                         Collections.singletonList(TableId.parse(tableName)),
-                        false);
+                        false,
+                        getMySqlSplitEnumeratorContext());
         assigner.open();
         MySqlSnapshotSplit snapshotSplit = (MySqlSnapshotSplit) 
assigner.getNext().get();
         // should contain only one split
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/MockMySqlSplitEnumeratorEnumeratorContext.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/MockMySqlSplitEnumeratorEnumeratorContext.java
new file mode 100644
index 000000000..fe547dad0
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/MockMySqlSplitEnumeratorEnumeratorContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
+
+/** A mock enumerator context to record isProcessingBacklog. */
+public class MockMySqlSplitEnumeratorEnumeratorContext
+        extends MockSplitEnumeratorContext<MySqlSplit> {
+    private boolean isProcessingBacklog = false;
+
+    public MockMySqlSplitEnumeratorEnumeratorContext(int parallelism) {
+        super(parallelism);
+    }
+
+    @Override
+    public void setIsProcessingBacklog(boolean isProcessingBacklog) {
+        this.isProcessingBacklog = isProcessingBacklog;
+    }
+
+    public boolean isProcessingBacklog() {
+        return isProcessingBacklog;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MetricsUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MetricsUtils.java
new file mode 100644
index 000000000..f4d10b472
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MetricsUtils.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.testutils;
+
+import 
org.apache.flink.cdc.connectors.mysql.source.utils.MockMySqlSplitEnumeratorEnumeratorContext;
+
+/** The test utils for metrics. */
+public class MetricsUtils {
+    public static MockMySqlSplitEnumeratorEnumeratorContext 
getMySqlSplitEnumeratorContext() {
+        return new MockMySqlSplitEnumeratorEnumeratorContext(1);
+    }
+}

Reply via email to