This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 48ca8623b [FLINK-34634][cdc-base][mysql] Fix that Restarting the job
will not read the changelog anymore if it stops before the synchronization of
meta information is complete and some table is removed (#3134)
48ca8623b is described below
commit 48ca8623bb8fa405adb56dbe505dbad10902db89
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Apr 12 10:15:46 2024 +0800
[FLINK-34634][cdc-base][mysql] Fix that Restarting the job will not read
the changelog anymore if it stops before the synchronization of meta
information is complete and some table is removed (#3134)
---
.../source/assigner/SnapshotSplitAssigner.java | 21 ++++++++++----
.../enumerator/IncrementalSourceEnumerator.java | 32 +++++++++++++++++-----
.../source/meta/events/StreamSplitMetaEvent.java | 15 +++++++++-
.../meta/events/StreamSplitMetaRequestEvent.java | 10 ++++++-
.../base/source/meta/split/StreamSplit.java | 17 +++++++++++-
.../source/reader/IncrementalSourceReader.java | 14 ++++++++--
.../mongodb/source/NewlyAddedTableITCase.java | 17 +++---------
.../assigners/MySqlSnapshotSplitAssigner.java | 1 +
.../source/enumerator/MySqlSourceEnumerator.java | 32 +++++++++++++++++-----
.../mysql/source/events/BinlogSplitMetaEvent.java | 15 +++++++++-
.../source/events/BinlogSplitMetaRequestEvent.java | 10 ++++++-
.../mysql/source/reader/MySqlSourceReader.java | 16 +++++++++--
.../mysql/source/split/MySqlBinlogSplit.java | 16 ++++++++++-
.../mysql/source/NewlyAddedTableITCase.java | 15 +++-------
.../postgres/source/NewlyAddedTableITCase.java | 15 +++-------
15 files changed, 181 insertions(+), 65 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
index 48c55cd8a..d424e89b7 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
@@ -39,10 +39,10 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -92,7 +92,7 @@ public class SnapshotSplitAssigner<C extends SourceConfig>
implements SplitAssig
currentParallelism,
new ArrayList<>(),
new ArrayList<>(),
- new HashMap<>(),
+ new LinkedHashMap<>(),
new HashMap<>(),
new HashMap<>(),
INITIAL_ASSIGNING,
@@ -143,7 +143,17 @@ public class SnapshotSplitAssigner<C extends SourceConfig>
implements SplitAssig
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = remainingSplits;
- this.assignedSplits = assignedSplits;
+ // When job restore from savepoint, sort the existing tables and newly
added tables
+ // to let enumerator only send newly added tables' StreamSplitMetaEvent
+ this.assignedSplits =
+ assignedSplits.entrySet().stream()
+ .sorted(Map.Entry.comparingByKey())
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue,
+ (o, o2) -> o,
+ LinkedHashMap::new));
this.tableSchemas = tableSchemas;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerStatus = assignerStatus;
@@ -230,6 +240,7 @@ public class SnapshotSplitAssigner<C extends SourceConfig>
implements SplitAssig
tableSchemas
.entrySet()
.removeIf(schema ->
tablesToRemove.contains(schema.getKey()));
+ LOG.info("Enumerator remove tables after restart: {}",
tablesToRemove);
remainingSplits.removeIf(split ->
tablesToRemove.contains(split.getTableId()));
remainingTables.removeAll(tablesToRemove);
alreadyProcessedTables.removeIf(tableId ->
tablesToRemove.contains(tableId));
@@ -303,9 +314,7 @@ public class SnapshotSplitAssigner<C extends SourceConfig>
implements SplitAssig
"The assigner is not ready to offer finished split
information, this should not be called");
}
final List<SchemalessSnapshotSplit> assignedSnapshotSplit =
- assignedSplits.values().stream()
- .sorted(Comparator.comparing(SourceSplitBase::splitId))
- .collect(Collectors.toList());
+ new ArrayList<>(assignedSplits.values());
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new
ArrayList<>();
for (SchemalessSnapshotSplit split : assignedSnapshotSplit) {
Offset finishedOffset = splitFinishedOffsets.get(split.splitId());
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
index af137ab40..05dcc6bd2 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -307,8 +307,22 @@ public class IncrementalSourceEnumerator
finishedSnapshotSplitInfos,
sourceConfig.getSplitMetaGroupSize());
}
final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();
-
- if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) {
+ final int totalFinishedSplitSizeOfReader =
requestEvent.getTotalFinishedSplitSize();
+ final int totalFinishedSplitSizeOfEnumerator =
splitAssigner.getFinishedSplitInfos().size();
+ if (totalFinishedSplitSizeOfReader >
totalFinishedSplitSizeOfEnumerator) {
+ LOG.warn(
+ "Total finished split size of subtask {} is {}, while
total finished split size of enumerator is only {}. Try to truncate it",
+ subTask,
+ totalFinishedSplitSizeOfReader,
+ totalFinishedSplitSizeOfEnumerator);
+ StreamSplitMetaEvent metadataEvent =
+ new StreamSplitMetaEvent(
+ requestEvent.getSplitId(),
+ requestMetaGroupId,
+ null,
+ totalFinishedSplitSizeOfEnumerator);
+ context.sendEventToSourceReader(subTask, metadataEvent);
+ } else if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) {
List<FinishedSnapshotSplitInfo> metaToSend =
finishedSnapshotSplitMeta.get(requestMetaGroupId);
StreamSplitMetaEvent metadataEvent =
@@ -317,13 +331,17 @@ public class IncrementalSourceEnumerator
requestMetaGroupId,
metaToSend.stream()
.map(FinishedSnapshotSplitInfo::serialize)
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()),
+ totalFinishedSplitSizeOfEnumerator);
context.sendEventToSourceReader(subTask, metadataEvent);
} else {
- LOG.error(
- "Received invalid request meta group id {}, the invalid
meta group id range is [0, {}]",
- requestMetaGroupId,
- finishedSnapshotSplitMeta.size() - 1);
+ throw new FlinkRuntimeException(
+ String.format(
+ "The enumerator received invalid request meta
group id %s, the valid meta group id range is [0, %s]. Total finished split
size of reader is %s, while the total finished split size of enumerator is %s.",
+ requestMetaGroupId,
+ finishedSnapshotSplitMeta.size() - 1,
+ totalFinishedSplitSizeOfReader,
+ totalFinishedSplitSizeOfEnumerator));
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java
index 4bf1922f6..8f9f580e9 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java
@@ -22,6 +22,8 @@ import
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceE
import
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader;
+import javax.annotation.Nullable;
+
import java.util.List;
/**
@@ -43,10 +45,17 @@ public class StreamSplitMetaEvent implements SourceEvent {
*/
private final List<byte[]> metaGroup;
- public StreamSplitMetaEvent(String splitId, int metaGroupId, List<byte[]>
metaGroup) {
+ private final int totalFinishedSplitSize;
+
+ public StreamSplitMetaEvent(
+ String splitId,
+ int metaGroupId,
+ @Nullable List<byte[]> metaGroup,
+ int totalFinishedSplitSize) {
this.splitId = splitId;
this.metaGroupId = metaGroupId;
this.metaGroup = metaGroup;
+ this.totalFinishedSplitSize = totalFinishedSplitSize;
}
public String getSplitId() {
@@ -60,4 +69,8 @@ public class StreamSplitMetaEvent implements SourceEvent {
public List<byte[]> getMetaGroup() {
return metaGroup;
}
+
+ public int getTotalFinishedSplitSize() {
+ return totalFinishedSplitSize;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java
index 272f1967a..8d657c273 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaRequestEvent.java
@@ -33,9 +33,13 @@ public class StreamSplitMetaRequestEvent implements
SourceEvent {
private final String splitId;
private final int requestMetaGroupId;
- public StreamSplitMetaRequestEvent(String splitId, int requestMetaGroupId)
{
+ private final int totalFinishedSplitSize;
+
+ public StreamSplitMetaRequestEvent(
+ String splitId, int requestMetaGroupId, int
totalFinishedSplitSize) {
this.splitId = splitId;
this.requestMetaGroupId = requestMetaGroupId;
+ this.totalFinishedSplitSize = totalFinishedSplitSize;
}
public String getSplitId() {
@@ -45,4 +49,8 @@ public class StreamSplitMetaRequestEvent implements
SourceEvent {
public int getRequestMetaGroupId() {
return requestMetaGroupId;
}
+
+ public int getTotalFinishedSplitSize() {
+ return totalFinishedSplitSize;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java
index 53bc21d68..f4143364a 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java
@@ -21,6 +21,8 @@ import
org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -29,11 +31,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/** The split to describe the change log of database table(s). */
public class StreamSplit extends SourceSplitBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamSplit.class);
public static final String STREAM_SPLIT_ID = "stream-split";
private final Offset startingOffset;
@@ -179,9 +183,20 @@ public class StreamSplit extends SourceSplitBase {
*/
public static StreamSplit filterOutdatedSplitInfos(
StreamSplit streamSplit, Predicate<TableId> currentTableFilter) {
+
+ Set<TableId> tablesToRemove =
+ streamSplit.getFinishedSnapshotSplitInfos().stream()
+ .filter(i -> !currentTableFilter.test(i.getTableId()))
+ .map(split -> split.getTableId())
+ .collect(Collectors.toSet());
+ if (tablesToRemove.isEmpty()) {
+ return streamSplit;
+ }
+
+ LOG.info("Reader remove tables after restart: {}", tablesToRemove);
List<FinishedSnapshotSplitInfo> allFinishedSnapshotSplitInfos =
streamSplit.getFinishedSnapshotSplitInfos().stream()
- .filter(i -> currentTableFilter.test(i.getTableId()))
+ .filter(i -> !tablesToRemove.contains(i.getTableId()))
.collect(Collectors.toList());
Map<TableId, TableChange> previousTableSchemas =
streamSplit.getTableSchemas();
Map<TableId, TableChange> newTableSchemas = new HashMap<>();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java
index de0147532..7be9785fd 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java
@@ -418,11 +418,20 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
StreamSplit streamSplit =
uncompletedStreamSplits.get(metadataEvent.getSplitId());
if (streamSplit != null) {
final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
+ final int receivedTotalFinishedSplitSize =
metadataEvent.getTotalFinishedSplitSize();
final int expectedMetaGroupId =
getNextMetaGroupId(
streamSplit.getFinishedSnapshotSplitInfos().size(),
sourceConfig.getSplitMetaGroupSize());
- if (receivedMetaGroupId == expectedMetaGroupId) {
+ if (receivedTotalFinishedSplitSize <
streamSplit.getTotalFinishedSplitSize()) {
+ LOG.warn(
+ "Source reader {} receives out of bound finished split
size. The received finished split size is {}, but expected is {}, truncate it",
+ subtaskId,
+ receivedTotalFinishedSplitSize,
+ streamSplit.getTotalFinishedSplitSize());
+ streamSplit = toNormalStreamSplit(streamSplit,
receivedTotalFinishedSplitSize);
+ uncompletedStreamSplits.put(streamSplit.splitId(),
streamSplit);
+ } else if (receivedMetaGroupId == expectedMetaGroupId) {
Set<String> existedSplitsOfLastGroup =
getExistedSplitsOfLastGroup(
streamSplit.getFinishedSnapshotSplitInfos(),
@@ -461,7 +470,8 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
streamSplit.getFinishedSnapshotSplitInfos().size(),
sourceConfig.getSplitMetaGroupSize());
StreamSplitMetaRequestEvent splitMetaRequestEvent =
- new StreamSplitMetaRequestEvent(splitId, nextMetaGroupId);
+ new StreamSplitMetaRequestEvent(
+ splitId, nextMetaGroupId,
streamSplit.getTotalFinishedSplitSize());
context.sendSourceEventToCoordinator(splitMetaRequestEvent);
} else {
LOG.info("The meta of stream split {} has been collected success",
splitId);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
index dc86f637a..3b1964856 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java
@@ -42,7 +42,6 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
-import java.lang.reflect.Field;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -948,20 +947,11 @@ public class NewlyAddedTableITCase extends
MongoDBSourceTestBase {
// Close sink upsert materialize to show more clear test output.
Configuration tableConfig = new Configuration();
tableConfig.setString("table.exec.sink.upsert-materialize", "none");
- StreamExecutionEnvironment env =
-
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig);
if (finishedSavePointPath != null) {
- // restore from savepoint
- // hack for test to visit protected
TestStreamEnvironment#getConfiguration() method
- ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
- Class<?> clazz =
- classLoader.loadClass(
-
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
- Field field = clazz.getDeclaredField("configuration");
- field.setAccessible(true);
- Configuration configuration = (Configuration) field.get(env);
- configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH,
finishedSavePointPath);
+ tableConfig.setString(SavepointConfigOptions.SAVEPOINT_PATH,
finishedSavePointPath);
}
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
@@ -987,6 +977,7 @@ public class NewlyAddedTableITCase extends
MongoDBSourceTestBase {
+ " 'password' = '%s',"
+ " 'database' = '%s',"
+ " 'collection' = '%s',"
+ + " 'chunk-meta.group.size' = '2',"
+ " 'heartbeat.interval.ms' = '100',"
+ " 'scan.full-changelog' = 'true',"
+ " 'scan.newly-added-table.enabled' = 'true'"
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index 1cefdfe8c..e2dd426a6 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -253,6 +253,7 @@ public class MySqlSnapshotSplitAssigner implements
MySqlSplitAssigner {
.entrySet()
.removeIf(schema ->
tablesToRemove.contains(schema.getKey()));
remainingSplits.removeIf(split ->
tablesToRemove.contains(split.getTableId()));
+ LOG.info("Enumerator remove tables after restart: {}",
tablesToRemove);
remainingTables.removeAll(tablesToRemove);
alreadyProcessedTables.removeIf(tableId ->
tablesToRemove.contains(tableId));
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
index a31197805..50d7607b6 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
@@ -298,8 +298,22 @@ public class MySqlSourceEnumerator implements
SplitEnumerator<MySqlSplit, Pendin
finishedSnapshotSplitInfos,
sourceConfig.getSplitMetaGroupSize());
}
final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();
-
- if (binlogSplitMeta.size() > requestMetaGroupId) {
+ final int totalFinishedSplitSizeOfReader =
requestEvent.getTotalFinishedSplitSize();
+ final int totalFinishedSplitSizeOfEnumerator =
splitAssigner.getFinishedSplitInfos().size();
+ if (totalFinishedSplitSizeOfReader >
totalFinishedSplitSizeOfEnumerator) {
+ LOG.warn(
+ "Total finished split size of subtask {} is {}, while
total finished split size of Enumerator is only {}. Try to truncate it",
+ subTask,
+ totalFinishedSplitSizeOfReader,
+ totalFinishedSplitSizeOfEnumerator);
+ BinlogSplitMetaEvent metadataEvent =
+ new BinlogSplitMetaEvent(
+ requestEvent.getSplitId(),
+ requestMetaGroupId,
+ null,
+ totalFinishedSplitSizeOfEnumerator);
+ context.sendEventToSourceReader(subTask, metadataEvent);
+ } else if (binlogSplitMeta.size() > requestMetaGroupId) {
List<FinishedSnapshotSplitInfo> metaToSend =
binlogSplitMeta.get(requestMetaGroupId);
BinlogSplitMetaEvent metadataEvent =
new BinlogSplitMetaEvent(
@@ -307,13 +321,17 @@ public class MySqlSourceEnumerator implements
SplitEnumerator<MySqlSplit, Pendin
requestMetaGroupId,
metaToSend.stream()
.map(FinishedSnapshotSplitInfo::serialize)
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()),
+ totalFinishedSplitSizeOfEnumerator);
context.sendEventToSourceReader(subTask, metadataEvent);
} else {
- LOG.error(
- "The enumerator received invalid request meta group id {},
the valid meta group id range is [0, {}]",
- requestMetaGroupId,
- binlogSplitMeta.size() - 1);
+ throw new FlinkRuntimeException(
+ String.format(
+ "The enumerator received invalid request meta
group id %s, the valid meta group id range is [0, %s]. Total finished split
size of reader is %s, while the total finished split size of enumerator is %s.",
+ requestMetaGroupId,
+ binlogSplitMeta.size() - 1,
+ totalFinishedSplitSizeOfReader,
+ totalFinishedSplitSizeOfEnumerator));
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java
index 60688bbd0..2c8fe0b1e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java
@@ -22,6 +22,8 @@ import
org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumer
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
+import javax.annotation.Nullable;
+
import java.util.List;
/**
@@ -43,10 +45,17 @@ public class BinlogSplitMetaEvent implements SourceEvent {
*/
private final List<byte[]> metaGroup;
- public BinlogSplitMetaEvent(String splitId, int metaGroupId, List<byte[]>
metaGroup) {
+ private final int totalFinishedSplitSize;
+
+ public BinlogSplitMetaEvent(
+ String splitId,
+ int metaGroupId,
+ @Nullable List<byte[]> metaGroup,
+ int totalFinishedSplitSize) {
this.splitId = splitId;
this.metaGroupId = metaGroupId;
this.metaGroup = metaGroup;
+ this.totalFinishedSplitSize = totalFinishedSplitSize;
}
public String getSplitId() {
@@ -60,4 +69,8 @@ public class BinlogSplitMetaEvent implements SourceEvent {
public List<byte[]> getMetaGroup() {
return metaGroup;
}
+
+ public int getTotalFinishedSplitSize() {
+ return totalFinishedSplitSize;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaRequestEvent.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaRequestEvent.java
index d3a2b8f33..6ffef8679 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaRequestEvent.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaRequestEvent.java
@@ -32,9 +32,13 @@ public class BinlogSplitMetaRequestEvent implements
SourceEvent {
private final String splitId;
private final int requestMetaGroupId;
- public BinlogSplitMetaRequestEvent(String splitId, int requestMetaGroupId)
{
+ private final int totalFinishedSplitSize;
+
+ public BinlogSplitMetaRequestEvent(
+ String splitId, int requestMetaGroupId, int
totalFinishedSplitSize) {
this.splitId = splitId;
this.requestMetaGroupId = requestMetaGroupId;
+ this.totalFinishedSplitSize = totalFinishedSplitSize;
}
public String getSplitId() {
@@ -44,4 +48,8 @@ public class BinlogSplitMetaRequestEvent implements
SourceEvent {
public int getRequestMetaGroupId() {
return requestMetaGroupId;
}
+
+ public int getTotalFinishedSplitSize() {
+ return totalFinishedSplitSize;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
index 6df83a648..253d9dc62 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
@@ -383,7 +383,8 @@ public class MySqlSourceReader<T>
binlogSplit.getFinishedSnapshotSplitInfos().size(),
sourceConfig.getSplitMetaGroupSize());
BinlogSplitMetaRequestEvent splitMetaRequestEvent =
- new BinlogSplitMetaRequestEvent(splitId, nextMetaGroupId);
+ new BinlogSplitMetaRequestEvent(
+ splitId, nextMetaGroupId,
binlogSplit.getTotalFinishedSplitSize());
context.sendSourceEventToCoordinator(splitMetaRequestEvent);
} else {
LOG.info("Source reader {} collects meta of binlog split success",
subtaskId);
@@ -395,11 +396,22 @@ public class MySqlSourceReader<T>
MySqlBinlogSplit binlogSplit =
uncompletedBinlogSplits.get(metadataEvent.getSplitId());
if (binlogSplit != null) {
final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
+ final int receivedTotalFinishedSplitSize =
metadataEvent.getTotalFinishedSplitSize();
final int expectedMetaGroupId =
ChunkUtils.getNextMetaGroupId(
binlogSplit.getFinishedSnapshotSplitInfos().size(),
sourceConfig.getSplitMetaGroupSize());
- if (receivedMetaGroupId == expectedMetaGroupId) {
+ if (receivedTotalFinishedSplitSize <
binlogSplit.getTotalFinishedSplitSize()) {
+ LOG.warn(
+ "Source reader {} receives out of bound finished split
size. The received finished split size is {}, but expected is {}, truncate it",
+ subtaskId,
+ receivedTotalFinishedSplitSize,
+ binlogSplit.getTotalFinishedSplitSize());
+ binlogSplit =
+ MySqlBinlogSplit.toNormalBinlogSplit(
+ binlogSplit, receivedTotalFinishedSplitSize);
+ uncompletedBinlogSplits.put(binlogSplit.splitId(),
binlogSplit);
+ } else if (receivedMetaGroupId == expectedMetaGroupId) {
List<FinishedSnapshotSplitInfo> newAddedMetadataGroup;
Set<String> existedSplitsOfLastGroup =
getExistedSplitsOfLastGroup(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
index d67d1989f..033844ab3 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
@@ -22,6 +22,8 @@ import
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.TableChanges.TableChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -29,10 +31,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
/** The split to describe the binlog of MySql table(s). */
public class MySqlBinlogSplit extends MySqlSplit {
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlBinlogSplit.class);
private final BinlogOffset startingOffset;
private final BinlogOffset endingOffset;
@@ -183,9 +187,19 @@ public class MySqlBinlogSplit extends MySqlSplit {
*/
public static MySqlBinlogSplit filterOutdatedSplitInfos(
MySqlBinlogSplit binlogSplit, Tables.TableFilter
currentTableFilter) {
+ Set<TableId> tablesToRemove =
+ binlogSplit.getFinishedSnapshotSplitInfos().stream()
+ .filter(i ->
!currentTableFilter.isIncluded(i.getTableId()))
+ .map(split -> split.getTableId())
+ .collect(Collectors.toSet());
+ if (tablesToRemove.isEmpty()) {
+ return binlogSplit;
+ }
+
+ LOG.info("Reader remove tables after restart: {}", tablesToRemove);
List<FinishedSnapshotSplitInfo> allFinishedSnapshotSplitInfos =
binlogSplit.getFinishedSnapshotSplitInfos().stream()
- .filter(i ->
currentTableFilter.isIncluded(i.getTableId()))
+ .filter(i -> !tablesToRemove.contains(i.getTableId()))
.collect(Collectors.toList());
return new MySqlBinlogSplit(
binlogSplit.splitId,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
index 2d78d733d..ac23918d2 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java
@@ -57,7 +57,6 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
-import java.lang.reflect.Field;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -893,6 +892,7 @@ public class NewlyAddedTableITCase extends
MySqlSourceTestBase {
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '2',"
+ + " 'chunk-meta.group.size' = '2',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s',"
+ " 'scan.newly-added-table.enabled' = 'true'"
@@ -919,19 +919,12 @@ public class NewlyAddedTableITCase extends
MySqlSourceTestBase {
private StreamExecutionEnvironment getStreamExecutionEnvironment(
String finishedSavePointPath, int parallelism) throws Exception {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ Configuration configuration = new Configuration();
if (finishedSavePointPath != null) {
- // restore from savepoint
- // hack for test to visit protected
TestStreamEnvironment#getConfiguration() method
- ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
- Class<?> clazz =
- classLoader.loadClass(
-
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
- Field field = clazz.getDeclaredField("configuration");
- field.setAccessible(true);
- Configuration configuration = (Configuration) field.get(env);
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH,
finishedSavePointPath);
}
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
index 19e624f5f..c28395f9d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/NewlyAddedTableITCase.java
@@ -43,7 +43,6 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
-import java.lang.reflect.Field;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -904,19 +903,12 @@ public class NewlyAddedTableITCase extends
PostgresTestBase {
private StreamExecutionEnvironment
getStreamExecutionEnvironmentFromSavePoint(
String finishedSavePointPath, int parallelism) throws Exception {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ Configuration configuration = new Configuration();
if (finishedSavePointPath != null) {
- // restore from savepoint
- // hack for test to visit protected
TestStreamEnvironment#getConfiguration() method
- ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
- Class<?> clazz =
- classLoader.loadClass(
-
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
- Field field = clazz.getDeclaredField("configuration");
- field.setAccessible(true);
- Configuration configuration = (Configuration) field.get(env);
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH,
finishedSavePointPath);
}
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
@@ -945,6 +937,7 @@ public class NewlyAddedTableITCase extends PostgresTestBase
{
+ " 'table-name' = '%s',"
+ " 'slot.name' = '%s', "
+ " 'scan.incremental.snapshot.chunk.size' = '2',"
+ + " 'chunk-meta.group.size' = '2',"
+ " 'scan.newly-added-table.enabled' = 'true'"
+ " %s"
+ ")",