Flink CDC Issue Import created FLINK-34857:
----------------------------------------------

             Summary: [Bug] Can't restore from checkpoint because 
assignedSplits does not equal to splitFinishedOffsets
                 Key: FLINK-34857
                 URL: https://issues.apache.org/jira/browse/FLINK-34857
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the 
[issues|https://github.com/ververica/flink-cdc-connectors/issues] and found 
nothing similar.


### Flink version

1.14.5

### Flink CDC version

2.4.0

### Database and its version

MySQL

### Minimal reproduce step

Can't restore from the checkpoint(please unzip the 
[ck.tar.gz|https://github.com/ververica/flink-cdc-connectors/files/13818472/ck.tar.gz)
 and restore from the _metadata)

can use the following ut code to debug
```
@Test
    public void testRestoreFaild() throws IOException {
        String checkpoint = "/Path/to/_metadata";
        Path chkPath = new Path(checkpoint);
        InputStream in = chkPath.getFileSystem().open(chkPath);
        DataInputStream inputStream = new DataInputStream(in);
        CheckpointMetadata metadata =
                Checkpoints.loadCheckpointMetadata(inputStream, 
Thread.currentThread().getContextClassLoader(), checkpoint);
        HybridSourceEnumeratorStateSerializer serializer = new 
HybridSourceEnumeratorStateSerializer();
        PendingSplitsStateSerializer splitsStateSerializer = new 
PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE);
        for (OperatorState operatorState : metadata.getOperatorStates()) {
            ByteStreamStateHandle coordinatorState = 
operatorState.getCoordinatorState();
            if (coordinatorState != null && 
"6dc0226b15c44c9c2e1f9ea1a65fd400".equals(operatorState.getOperatorID().toHexString()))
 {

                HybridSourceEnumeratorState hybridSourceEnumeratorState =
                        serializer.deserialize(0, coordinatorState.getData());
                System.out.println(hybridSourceEnumeratorState);
                HybridPendingSplitsState pendingSplitsState =
                        (HybridPendingSplitsState) 
splitsStateSerializer.deserialize(
                                
hybridSourceEnumeratorState.getWrappedStateSerializerVersion(),
                                hybridSourceEnumeratorState.getWrappedState());
                
System.out.println("================================================================================");
                Set<String> splits = new HashSet<>();
                for (MySqlSchemalessSnapshotSplit split : 
pendingSplitsState.getSnapshotPendingSplits().getAssignedSplits().values()) {
                    if 
(!pendingSplitsState.getSnapshotPendingSplits().getSplitFinishedOffsets().containsKey(split.splitId()))
 {
                        System.out.println(split);
                    }
                    splits.add(split.getTableId().identifier());
                }
                
System.out.println("================================================================================");
                Set<String> tables = new HashSet<>();
                for (TableId id : 
pendingSplitsState.getSnapshotPendingSplits().getAlreadyProcessedTables()) {
                    tables.add(id.identifier());
                }

                System.out.println("================================");
                System.out.println(splits);
                System.out.println("================================");
                System.out.println(tables);
                System.out.println("================================");
//                System.out.println(pendingSplitsState);

            }
        }
    }
```

### What did you expect to see?

Can restore from checkpoint

### What did you see instead?

The exception is below
```
java.lang.NullPointerException: null
        at 
com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.createBinlogSplit(MySqlHybridSplitAssigner.java:208]
 ~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0]
        at 
com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getNext(MySqlHybridSplitAssigner.java:112)
 ~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0]
        at 
com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.assignSplits(MySqlSourceEnumerator.java:199)
 ~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0]
        at 
com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSplitRequest(MySqlSourceEnumerator.java:104)
 ~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0]
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:157)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5]
```

### Anything else?

_No response_

### Are you willing to submit a PR?

- [x] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2958
Created by: [klion26|https://github.com/klion26]
Labels: bug, 
Created at: Wed Jan 03 19:40:17 CST 2024
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to