Flink CDC Issue Import created FLINK-34857:
----------------------------------------------
Summary: [Bug] Can't restore from checkpoint because
assignedSplits does not equal to splitFinishedOffsets
Key: FLINK-34857
URL: https://issues.apache.org/jira/browse/FLINK-34857
Project: Flink
Issue Type: Bug
Components: Flink CDC
Reporter: Flink CDC Issue Import
### Search before asking
- [X] I searched in the
[issues|https://github.com/ververica/flink-cdc-connectors/issues] and found
nothing similar.
### Flink version
1.14.5
### Flink CDC version
2.4.0
### Database and its version
MySQL
### Minimal reproduce step
Can't restore from the checkpoint(please unzip the
[ck.tar.gz|https://github.com/ververica/flink-cdc-connectors/files/13818472/ck.tar.gz)
and restore from the _metadata)
can use the following ut code to debug
```
@Test
public void testRestoreFaild() throws IOException {
String checkpoint = "/Path/to/_metadata";
Path chkPath = new Path(checkpoint);
InputStream in = chkPath.getFileSystem().open(chkPath);
DataInputStream inputStream = new DataInputStream(in);
CheckpointMetadata metadata =
Checkpoints.loadCheckpointMetadata(inputStream,
Thread.currentThread().getContextClassLoader(), checkpoint);
HybridSourceEnumeratorStateSerializer serializer = new
HybridSourceEnumeratorStateSerializer();
PendingSplitsStateSerializer splitsStateSerializer = new
PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE);
for (OperatorState operatorState : metadata.getOperatorStates()) {
ByteStreamStateHandle coordinatorState =
operatorState.getCoordinatorState();
if (coordinatorState != null &&
"6dc0226b15c44c9c2e1f9ea1a65fd400".equals(operatorState.getOperatorID().toHexString()))
{
HybridSourceEnumeratorState hybridSourceEnumeratorState =
serializer.deserialize(0, coordinatorState.getData());
System.out.println(hybridSourceEnumeratorState);
HybridPendingSplitsState pendingSplitsState =
(HybridPendingSplitsState)
splitsStateSerializer.deserialize(
hybridSourceEnumeratorState.getWrappedStateSerializerVersion(),
hybridSourceEnumeratorState.getWrappedState());
System.out.println("================================================================================");
Set<String> splits = new HashSet<>();
for (MySqlSchemalessSnapshotSplit split :
pendingSplitsState.getSnapshotPendingSplits().getAssignedSplits().values()) {
if
(!pendingSplitsState.getSnapshotPendingSplits().getSplitFinishedOffsets().containsKey(split.splitId()))
{
System.out.println(split);
}
splits.add(split.getTableId().identifier());
}
System.out.println("================================================================================");
Set<String> tables = new HashSet<>();
for (TableId id :
pendingSplitsState.getSnapshotPendingSplits().getAlreadyProcessedTables()) {
tables.add(id.identifier());
}
System.out.println("================================");
System.out.println(splits);
System.out.println("================================");
System.out.println(tables);
System.out.println("================================");
// System.out.println(pendingSplitsState);
}
}
}
```
### What did you expect to see?
Can restore from checkpoint
### What did you see instead?
The exception is below
```
java.lang.NullPointerException: null
at
com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.createBinlogSplit(MySqlHybridSplitAssigner.java:208]
~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0]
at
com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getNext(MySqlHybridSplitAssigner.java:112)
~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0]
at
com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.assignSplits(MySqlSourceEnumerator.java:199)
~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0]
at
com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSplitRequest(MySqlSourceEnumerator.java:104)
~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0]
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:157)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
```
### Anything else?
_No response_
### Are you willing to submit a PR?
- [x] I'm willing to submit a PR!
---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2958
Created by: [klion26|https://github.com/klion26]
Labels: bug,
Created at: Wed Jan 03 19:40:17 CST 2024
State: open
--
This message was sent by Atlassian Jira
(v8.20.10#820010)