This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 48ca8623b [FLINK-34634][cdc-base][mysql] Fix that Restarting the job 
will not read the changelog anymore if it stops before the synchronization of 
meta information is complete and some table is removed (#3134)
48ca8623b is described below

commit 48ca8623bb8fa405adb56dbe505dbad10902db89
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Apr 12 10:15:46 2024 +0800

    [FLINK-34634][cdc-base][mysql] Fix that Restarting the job will not read 
the changelog anymore if it stops before the synchronization of meta 
information is complete and some table is removed (#3134)
---
 .../source/assigner/SnapshotSplitAssigner.java     | 21 ++++++++++----
 .../enumerator/IncrementalSourceEnumerator.java    | 32 +++++++++++++++++-----
 .../source/meta/events/StreamSplitMetaEvent.java   | 15 +++++++++-
 .../meta/events/StreamSplitMetaRequestEvent.java   | 10 ++++++-
 .../base/source/meta/split/StreamSplit.java        | 17 +++++++++++-
 .../source/reader/IncrementalSourceReader.java     | 14 ++++++++--
 .../mongodb/source/NewlyAddedTableITCase.java      | 17 +++---------
 .../assigners/MySqlSnapshotSplitAssigner.java      |  1 +
 .../source/enumerator/MySqlSourceEnumerator.java   | 32 +++++++++++++++++-----
 .../mysql/source/events/BinlogSplitMetaEvent.java  | 15 +++++++++-
 .../source/events/BinlogSplitMetaRequestEvent.java | 10 ++++++-
 .../mysql/source/reader/MySqlSourceReader.java     | 16 +++++++++--
 .../mysql/source/split/MySqlBinlogSplit.java       | 16 ++++++++++-
 .../mysql/source/NewlyAddedTableITCase.java        | 15 +++-------
 .../postgres/source/NewlyAddedTableITCase.java     | 15 +++-------
 15 files changed, 181 insertions(+), 65 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
index 48c55cd8a..d424e89b7 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
@@ -39,10 +39,10 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -92,7 +92,7 @@ public class SnapshotSplitAssigner<C extends SourceConfig> 
implements SplitAssig
                 currentParallelism,
                 new ArrayList<>(),
                 new ArrayList<>(),
-                new HashMap<>(),
+                new LinkedHashMap<>(),
                 new HashMap<>(),
                 new HashMap<>(),
                 INITIAL_ASSIGNING,
@@ -143,7 +143,17 @@ public class SnapshotSplitAssigner<C extends SourceConfig> 
implements SplitAssig
         this.currentParallelism = currentParallelism;
         this.alreadyProcessedTables = alreadyProcessedTables;
         this.remainingSplits = remainingSplits;
-        this.assignedSplits = assignedSplits;
+        // When job restore from savepoint, sort the existing tables and newly 
added tables
+        // to let enumerator only send newly added tables' StreamSplitMetaEvent
+        this.assignedSplits =
+                assignedSplits.entrySet().stream()
+                        .sorted(Map.Entry.comparingByKey())
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        Map.Entry::getValue,
+                                        (o, o2) -> o,
+                                        LinkedHashMap::new));
         this.tableSchemas = tableSchemas;
         this.splitFinishedOffsets = splitFinishedOffsets;
         this.assignerStatus = assignerStatus;
@@ -230,6 +240,7 @@ public class SnapshotSplitAssigner<C extends SourceConfig> 
implements SplitAssig
                     tableSchemas
                             .entrySet()
                             .removeIf(schema -> 
tablesToRemove.contains(schema.getKey()));
+                    LOG.info("Enumerator remove tables after restart: {}", 
tablesToRemove);
                     remainingSplits.removeIf(split -> 
tablesToRemove.contains(split.getTableId()));
                     remainingTables.removeAll(tablesToRemove);
                     alreadyProcessedTables.removeIf(tableId -> 
tablesToRemove.contains(tableId));
@@ -303,9 +314,7 @@ public class SnapshotSplitAssigner<C extends SourceConfig> 
implements SplitAssig
                     "The assigner is not ready to offer finished split 
information, this should not be called");
         }
         final List<SchemalessSnapshotSplit> assignedSnapshotSplit =
-                assignedSplits.values().stream()
-                        .sorted(Comparator.comparing(SourceSplitBase::splitId))
-                        .collect(Collectors.toList());
+                new ArrayList<>(assignedSplits.values());
         List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new 
ArrayList<>();
         for (SchemalessSnapshotSplit split : assignedSnapshotSplit) {
             Offset finishedOffset = splitFinishedOffsets.get(split.splitId());
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
index af137ab40..05dcc6bd2 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -307,8 +307,22 @@ public class IncrementalSourceEnumerator
                             finishedSnapshotSplitInfos, 
sourceConfig.getSplitMetaGroupSize());
         }
         final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();
-
-        if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) {
+        final int totalFinishedSplitSizeOfReader = 
requestEvent.getTotalFinishedSplitSize();
+        final int totalFinishedSplitSizeOfEnumerator = 
splitAssigner.getFinishedSplitInfos().size();
+        if (totalFinishedSplitSizeOfReader > 
totalFinishedSplitSizeOfEnumerator) {
+            LOG.warn(
+                    "Total finished split size of subtask {} is {}, while 
total finished split size of enumerator is only {}. Try to truncate it",
+                    subTask,
+                    totalFinishedSplitSizeOfReader,
+                    totalFinishedSplitSizeOfEnumerator);
+            StreamSplitMetaEvent metadataEvent =
+                    new StreamSplitMetaEvent(
+                            requestEvent.getSplitId(),
+                            requestMetaGroupId,
+                            null,
+                            totalFinishedSplitSizeOfEnumerator);
+            context.sendEventToSourceReader(subTask, metadataEvent);
+        } else if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) {
             List<FinishedSnapshotSplitInfo> metaToSend =
                     finishedSnapshotSplitMeta.get(requestMetaGroupId);
             StreamSplitMetaEvent metadataEvent =
@@ -317,13 +331,17 @@ public class IncrementalSourceEnumerator
                             requestMetaGroupId,
                             metaToSend.stream()
                                     .map(FinishedSnapshotSplitInfo::serialize)
-                                    .collect(Collectors.toList()));
+                                    .collect(Collectors.toList()),
+                            totalFinishedSplitSizeOfEnumerator);
             context.sendEventToSourceReader(subTask, metadataEvent);
         } else {
-            LOG.error(
-                    "Received invalid request meta group id {}, the invalid 
meta group id range is [0, {}]",
-                    requestMetaGroupId,
-                    finishedSnapshotSplitMeta.size() - 1);
+            throw new FlinkRuntimeException(
+                    String.format(
+                            "The enumerator received invalid request meta 
group id %s, the valid meta group id range is [0, %s]. Total finished split 
size of reader is %s, while the total finished split size of enumerator is %s.",
+                            requestMetaGroupId,
+                            finishedSnapshotSplitMeta.size() - 1,
+                            totalFinishedSplitSizeOfReader,
+                            totalFinishedSplitSizeOfEnumerator));
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java
index 4bf1922f6..8f9f580e9 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceE
 import 
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
 import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 /**
@@ -43,10 +45,17 @@ public class StreamSplitMetaEvent implements SourceEvent {
      */
     private final List<byte[]> metaGroup;
 
-    public StreamSplitMetaEvent(String splitId, int metaGroupId, List<byte[]> 
metaGroup) {
+    private final int totalFinishedSplitSize;
+
+    public StreamSplitMetaEvent(
+            String splitId,
+            int metaGroupId,
+            @Nullable List<byte[]> metaGroup,
+            int totalFinishedSplitSize) {
         this.splitId = splitId;
         this.metaGroupId = metaGroupId;
         this.metaGroup = metaGroup;
+        this.totalFinishedSplitSize = totalFinishedSplitSize;
     }
 
     public String getSplitId() {
@@ -60,4 +69,8 @@ public class StreamSplitMetaEvent implements SourceEvent {
     public List<byte[]> getMetaGroup() {
         return metaGroup;
     }
+
+    public int getTotalFinishedSplitSize() {
+        return totalFinishedSplitSize;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java
index 272f1967a..8d657c273 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java
@@ -33,9 +33,13 @@ public class StreamSplitMetaRequestEvent implements 
SourceEvent {
     private final String splitId;
     private final int requestMetaGroupId;
 
-    public StreamSplitMetaRequestEvent(String splitId, int requestMetaGroupId) 
{
+    private final int totalFinishedSplitSize;
+
+    public StreamSplitMetaRequestEvent(
+            String splitId, int requestMetaGroupId, int 
totalFinishedSplitSize) {
         this.splitId = splitId;
         this.requestMetaGroupId = requestMetaGroupId;
+        this.totalFinishedSplitSize = totalFinishedSplitSize;
     }
 
     public String getSplitId() {
@@ -45,4 +49,8 @@ public class StreamSplitMetaRequestEvent implements 
SourceEvent {
     public int getRequestMetaGroupId() {
         return requestMetaGroupId;
     }
+
+    public int getTotalFinishedSplitSize() {
+        return totalFinishedSplitSize;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java
index 53bc21d68..f4143364a 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java
@@ -21,6 +21,8 @@ import 
org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
 
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges.TableChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -29,11 +31,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /** The split to describe the change log of database table(s). */
 public class StreamSplit extends SourceSplitBase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StreamSplit.class);
     public static final String STREAM_SPLIT_ID = "stream-split";
 
     private final Offset startingOffset;
@@ -179,9 +183,20 @@ public class StreamSplit extends SourceSplitBase {
      */
     public static StreamSplit filterOutdatedSplitInfos(
             StreamSplit streamSplit, Predicate<TableId> currentTableFilter) {
+
+        Set<TableId> tablesToRemove =
+                streamSplit.getFinishedSnapshotSplitInfos().stream()
+                        .filter(i -> !currentTableFilter.test(i.getTableId()))
+                        .map(split -> split.getTableId())
+                        .collect(Collectors.toSet());
+        if (tablesToRemove.isEmpty()) {
+            return streamSplit;
+        }
+
+        LOG.info("Reader remove tables after restart: {}", tablesToRemove);
         List<FinishedSnapshotSplitInfo> allFinishedSnapshotSplitInfos =
                 streamSplit.getFinishedSnapshotSplitInfos().stream()
-                        .filter(i -> currentTableFilter.test(i.getTableId()))
+                        .filter(i -> !tablesToRemove.contains(i.getTableId()))
                         .collect(Collectors.toList());
         Map<TableId, TableChange> previousTableSchemas = 
streamSplit.getTableSchemas();
         Map<TableId, TableChange> newTableSchemas = new HashMap<>();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java
index de0147532..7be9785fd 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java
@@ -418,11 +418,20 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
         StreamSplit streamSplit = 
uncompletedStreamSplits.get(metadataEvent.getSplitId());
         if (streamSplit != null) {
             final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
+            final int receivedTotalFinishedSplitSize = 
metadataEvent.getTotalFinishedSplitSize();
             final int expectedMetaGroupId =
                     getNextMetaGroupId(
                             streamSplit.getFinishedSnapshotSplitInfos().size(),
                             sourceConfig.getSplitMetaGroupSize());
-            if (receivedMetaGroupId == expectedMetaGroupId) {
+            if (receivedTotalFinishedSplitSize < 
streamSplit.getTotalFinishedSplitSize()) {
+                LOG.warn(
+                        "Source reader {} receives out of bound finished split 
size. The received finished split size is {}, but expected is {}, truncate it",
+                        subtaskId,
+                        receivedTotalFinishedSplitSize,
+                        streamSplit.getTotalFinishedSplitSize());
+                streamSplit = toNormalStreamSplit(streamSplit, 
receivedTotalFinishedSplitSize);
+                uncompletedStreamSplits.put(streamSplit.splitId(), 
streamSplit);
+            } else if (receivedMetaGroupId == expectedMetaGroupId) {
                 Set<String> existedSplitsOfLastGroup =
                         getExistedSplitsOfLastGroup(
                                 streamSplit.getFinishedSnapshotSplitInfos(),
@@ -461,7 +470,8 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
                             streamSplit.getFinishedSnapshotSplitInfos().size(),
                             sourceConfig.getSplitMetaGroupSize());
             StreamSplitMetaRequestEvent splitMetaRequestEvent =
-                    new StreamSplitMetaRequestEvent(splitId, nextMetaGroupId);
+                    new StreamSplitMetaRequestEvent(
+                            splitId, nextMetaGroupId, 
streamSplit.getTotalFinishedSplitSize());
             context.sendSourceEventToCoordinator(splitMetaRequestEvent);
         } else {
             LOG.info("The meta of stream split {} has been collected success", 
splitId);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
index dc86f637a..3b1964856 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
@@ -42,7 +42,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 
-import java.lang.reflect.Field;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -948,20 +947,11 @@ public class NewlyAddedTableITCase extends 
MongoDBSourceTestBase {
         // Close sink upsert materialize to show more clear test output.
         Configuration tableConfig = new Configuration();
         tableConfig.setString("table.exec.sink.upsert-materialize", "none");
-        StreamExecutionEnvironment env =
-                
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig);
         if (finishedSavePointPath != null) {
-            // restore from savepoint
-            // hack for test to visit protected 
TestStreamEnvironment#getConfiguration() method
-            ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
-            Class<?> clazz =
-                    classLoader.loadClass(
-                            
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
-            Field field = clazz.getDeclaredField("configuration");
-            field.setAccessible(true);
-            Configuration configuration = (Configuration) field.get(env);
-            configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, 
finishedSavePointPath);
+            tableConfig.setString(SavepointConfigOptions.SAVEPOINT_PATH, 
finishedSavePointPath);
         }
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig);
         env.setParallelism(parallelism);
         env.enableCheckpointing(200L);
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
@@ -987,6 +977,7 @@ public class NewlyAddedTableITCase extends 
MongoDBSourceTestBase {
                         + " 'password' = '%s',"
                         + " 'database' = '%s',"
                         + " 'collection' = '%s',"
+                        + " 'chunk-meta.group.size' = '2',"
                         + " 'heartbeat.interval.ms' = '100',"
                         + " 'scan.full-changelog' = 'true',"
                         + " 'scan.newly-added-table.enabled' = 'true'"
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index 1cefdfe8c..e2dd426a6 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -253,6 +253,7 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
                             .entrySet()
                             .removeIf(schema -> 
tablesToRemove.contains(schema.getKey()));
                     remainingSplits.removeIf(split -> 
tablesToRemove.contains(split.getTableId()));
+                    LOG.info("Enumerator remove tables after restart: {}", 
tablesToRemove);
                     remainingTables.removeAll(tablesToRemove);
                     alreadyProcessedTables.removeIf(tableId -> 
tablesToRemove.contains(tableId));
                 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
index a31197805..50d7607b6 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
@@ -298,8 +298,22 @@ public class MySqlSourceEnumerator implements 
SplitEnumerator<MySqlSplit, Pendin
                             finishedSnapshotSplitInfos, 
sourceConfig.getSplitMetaGroupSize());
         }
         final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();
-
-        if (binlogSplitMeta.size() > requestMetaGroupId) {
+        final int totalFinishedSplitSizeOfReader = 
requestEvent.getTotalFinishedSplitSize();
+        final int totalFinishedSplitSizeOfEnumerator = 
splitAssigner.getFinishedSplitInfos().size();
+        if (totalFinishedSplitSizeOfReader > 
totalFinishedSplitSizeOfEnumerator) {
+            LOG.warn(
+                    "Total finished split size of subtask {} is {}, while 
total finished split size of Enumerator is only {}. Try to truncate it",
+                    subTask,
+                    totalFinishedSplitSizeOfReader,
+                    totalFinishedSplitSizeOfEnumerator);
+            BinlogSplitMetaEvent metadataEvent =
+                    new BinlogSplitMetaEvent(
+                            requestEvent.getSplitId(),
+                            requestMetaGroupId,
+                            null,
+                            totalFinishedSplitSizeOfEnumerator);
+            context.sendEventToSourceReader(subTask, metadataEvent);
+        } else if (binlogSplitMeta.size() > requestMetaGroupId) {
             List<FinishedSnapshotSplitInfo> metaToSend = 
binlogSplitMeta.get(requestMetaGroupId);
             BinlogSplitMetaEvent metadataEvent =
                     new BinlogSplitMetaEvent(
@@ -307,13 +321,17 @@ public class MySqlSourceEnumerator implements 
SplitEnumerator<MySqlSplit, Pendin
                             requestMetaGroupId,
                             metaToSend.stream()
                                     .map(FinishedSnapshotSplitInfo::serialize)
-                                    .collect(Collectors.toList()));
+                                    .collect(Collectors.toList()),
+                            totalFinishedSplitSizeOfEnumerator);
             context.sendEventToSourceReader(subTask, metadataEvent);
         } else {
-            LOG.error(
-                    "The enumerator received invalid request meta group id {}, 
the valid meta group id range is [0, {}]",
-                    requestMetaGroupId,
-                    binlogSplitMeta.size() - 1);
+            throw new FlinkRuntimeException(
+                    String.format(
+                            "The enumerator received invalid request meta 
group id %s, the valid meta group id range is [0, %s]. Total finished split 
size of reader is %s, while the total finished split size of enumerator is %s.",
+                            requestMetaGroupId,
+                            binlogSplitMeta.size() - 1,
+                            totalFinishedSplitSizeOfReader,
+                            totalFinishedSplitSizeOfEnumerator));
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java
index 60688bbd0..2c8fe0b1e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumer
 import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader;
 import 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 /**
@@ -43,10 +45,17 @@ public class BinlogSplitMetaEvent implements SourceEvent {
      */
     private final List<byte[]> metaGroup;
 
-    public BinlogSplitMetaEvent(String splitId, int metaGroupId, List<byte[]> 
metaGroup) {
+    private final int totalFinishedSplitSize;
+
+    public BinlogSplitMetaEvent(
+            String splitId,
+            int metaGroupId,
+            @Nullable List<byte[]> metaGroup,
+            int totalFinishedSplitSize) {
         this.splitId = splitId;
         this.metaGroupId = metaGroupId;
         this.metaGroup = metaGroup;
+        this.totalFinishedSplitSize = totalFinishedSplitSize;
     }
 
     public String getSplitId() {
@@ -60,4 +69,8 @@ public class BinlogSplitMetaEvent implements SourceEvent {
     public List<byte[]> getMetaGroup() {
         return metaGroup;
     }
+
+    public int getTotalFinishedSplitSize() {
+        return totalFinishedSplitSize;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaRequestEvent.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaRequestEvent.java
index d3a2b8f33..6ffef8679 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaRequestEvent.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaRequestEvent.java
@@ -32,9 +32,13 @@ public class BinlogSplitMetaRequestEvent implements 
SourceEvent {
     private final String splitId;
     private final int requestMetaGroupId;
 
-    public BinlogSplitMetaRequestEvent(String splitId, int requestMetaGroupId) 
{
+    private final int totalFinishedSplitSize;
+
+    public BinlogSplitMetaRequestEvent(
+            String splitId, int requestMetaGroupId, int 
totalFinishedSplitSize) {
         this.splitId = splitId;
         this.requestMetaGroupId = requestMetaGroupId;
+        this.totalFinishedSplitSize = totalFinishedSplitSize;
     }
 
     public String getSplitId() {
@@ -44,4 +48,8 @@ public class BinlogSplitMetaRequestEvent implements 
SourceEvent {
     public int getRequestMetaGroupId() {
         return requestMetaGroupId;
     }
+
+    public int getTotalFinishedSplitSize() {
+        return totalFinishedSplitSize;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
index 6df83a648..253d9dc62 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
@@ -383,7 +383,8 @@ public class MySqlSourceReader<T>
                             binlogSplit.getFinishedSnapshotSplitInfos().size(),
                             sourceConfig.getSplitMetaGroupSize());
             BinlogSplitMetaRequestEvent splitMetaRequestEvent =
-                    new BinlogSplitMetaRequestEvent(splitId, nextMetaGroupId);
+                    new BinlogSplitMetaRequestEvent(
+                            splitId, nextMetaGroupId, 
binlogSplit.getTotalFinishedSplitSize());
             context.sendSourceEventToCoordinator(splitMetaRequestEvent);
         } else {
             LOG.info("Source reader {} collects meta of binlog split success", 
subtaskId);
@@ -395,11 +396,22 @@ public class MySqlSourceReader<T>
         MySqlBinlogSplit binlogSplit = 
uncompletedBinlogSplits.get(metadataEvent.getSplitId());
         if (binlogSplit != null) {
             final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
+            final int receivedTotalFinishedSplitSize = 
metadataEvent.getTotalFinishedSplitSize();
             final int expectedMetaGroupId =
                     ChunkUtils.getNextMetaGroupId(
                             binlogSplit.getFinishedSnapshotSplitInfos().size(),
                             sourceConfig.getSplitMetaGroupSize());
-            if (receivedMetaGroupId == expectedMetaGroupId) {
+            if (receivedTotalFinishedSplitSize < 
binlogSplit.getTotalFinishedSplitSize()) {
+                LOG.warn(
+                        "Source reader {} receives out of bound finished split 
size. The received finished split size is {}, but expected is {}, truncate it",
+                        subtaskId,
+                        receivedTotalFinishedSplitSize,
+                        binlogSplit.getTotalFinishedSplitSize());
+                binlogSplit =
+                        MySqlBinlogSplit.toNormalBinlogSplit(
+                                binlogSplit, receivedTotalFinishedSplitSize);
+                uncompletedBinlogSplits.put(binlogSplit.splitId(), 
binlogSplit);
+            } else if (receivedMetaGroupId == expectedMetaGroupId) {
                 List<FinishedSnapshotSplitInfo> newAddedMetadataGroup;
                 Set<String> existedSplitsOfLastGroup =
                         getExistedSplitsOfLastGroup(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
index d67d1989f..033844ab3 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
 import io.debezium.relational.TableId;
 import io.debezium.relational.Tables;
 import io.debezium.relational.history.TableChanges.TableChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -29,10 +31,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /** The split to describe the binlog of MySql table(s). */
 public class MySqlBinlogSplit extends MySqlSplit {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlBinlogSplit.class);
 
     private final BinlogOffset startingOffset;
     private final BinlogOffset endingOffset;
@@ -183,9 +187,19 @@ public class MySqlBinlogSplit extends MySqlSplit {
      */
     public static MySqlBinlogSplit filterOutdatedSplitInfos(
             MySqlBinlogSplit binlogSplit, Tables.TableFilter 
currentTableFilter) {
+        Set<TableId> tablesToRemove =
+                binlogSplit.getFinishedSnapshotSplitInfos().stream()
+                        .filter(i -> 
!currentTableFilter.isIncluded(i.getTableId()))
+                        .map(split -> split.getTableId())
+                        .collect(Collectors.toSet());
+        if (tablesToRemove.isEmpty()) {
+            return binlogSplit;
+        }
+
+        LOG.info("Reader remove tables after restart: {}", tablesToRemove);
         List<FinishedSnapshotSplitInfo> allFinishedSnapshotSplitInfos =
                 binlogSplit.getFinishedSnapshotSplitInfos().stream()
-                        .filter(i -> 
currentTableFilter.isIncluded(i.getTableId()))
+                        .filter(i -> !tablesToRemove.contains(i.getTableId()))
                         .collect(Collectors.toList());
         return new MySqlBinlogSplit(
                 binlogSplit.splitId,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
index 2d78d733d..ac23918d2 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
@@ -57,7 +57,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 
-import java.lang.reflect.Field;
 import java.sql.SQLException;
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -893,6 +892,7 @@ public class NewlyAddedTableITCase extends 
MySqlSourceTestBase {
                         + " 'database-name' = '%s',"
                         + " 'table-name' = '%s',"
                         + " 'scan.incremental.snapshot.chunk.size' = '2',"
+                        + " 'chunk-meta.group.size' = '2',"
                         + " 'server-time-zone' = 'UTC',"
                         + " 'server-id' = '%s',"
                         + " 'scan.newly-added-table.enabled' = 'true'"
@@ -919,19 +919,12 @@ public class NewlyAddedTableITCase extends 
MySqlSourceTestBase {
 
     private StreamExecutionEnvironment getStreamExecutionEnvironment(
             String finishedSavePointPath, int parallelism) throws Exception {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        Configuration configuration = new Configuration();
         if (finishedSavePointPath != null) {
-            // restore from savepoint
-            // hack for test to visit protected 
TestStreamEnvironment#getConfiguration() method
-            ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
-            Class<?> clazz =
-                    classLoader.loadClass(
-                            
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
-            Field field = clazz.getDeclaredField("configuration");
-            field.setAccessible(true);
-            Configuration configuration = (Configuration) field.get(env);
             configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, 
finishedSavePointPath);
         }
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         env.setParallelism(parallelism);
         env.enableCheckpointing(200L);
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
index 19e624f5f..c28395f9d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
@@ -43,7 +43,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 
-import java.lang.reflect.Field;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -904,19 +903,12 @@ public class NewlyAddedTableITCase extends 
PostgresTestBase {
 
     private StreamExecutionEnvironment 
getStreamExecutionEnvironmentFromSavePoint(
             String finishedSavePointPath, int parallelism) throws Exception {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        Configuration configuration = new Configuration();
         if (finishedSavePointPath != null) {
-            // restore from savepoint
-            // hack for test to visit protected 
TestStreamEnvironment#getConfiguration() method
-            ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
-            Class<?> clazz =
-                    classLoader.loadClass(
-                            
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
-            Field field = clazz.getDeclaredField("configuration");
-            field.setAccessible(true);
-            Configuration configuration = (Configuration) field.get(env);
             configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, 
finishedSavePointPath);
         }
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         env.setParallelism(parallelism);
         env.enableCheckpointing(200L);
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
@@ -945,6 +937,7 @@ public class NewlyAddedTableITCase extends PostgresTestBase 
{
                         + " 'table-name' = '%s',"
                         + " 'slot.name' = '%s', "
                         + " 'scan.incremental.snapshot.chunk.size' = '2',"
+                        + " 'chunk-meta.group.size' = '2',"
                         + " 'scan.newly-added-table.enabled' = 'true'"
                         + " %s"
                         + ")",


Reply via email to