This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8d3f8e4627 [Bugfix][CDC Base] Fix NPE caused by adding a table for
restore job (#6145)
8d3f8e4627 is described below
commit 8d3f8e46271631903f1c3458801c57d9c53ccf0e
Author: ic4y <[email protected]>
AuthorDate: Tue Jan 9 10:41:18 2024 +0800
[Bugfix][CDC Base] Fix NPE caused by adding a table for restore job (#6145)
---
.../connectors/cdc/base/source/IncrementalSource.java | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 11769ab87a..f62ce75d02 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -311,6 +311,24 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
checkpointSnapshotState.getAssignedSplits().remove(splitId);
checkpointSnapshotState.getSplitCompletedOffsets().remove(splitId);
});
+
+ if ((!checkpointSnapshotState.getRemainingTables().isEmpty()
+ ||
!checkpointSnapshotState.getRemainingSplits().isEmpty())
+ && checkpointSnapshotState.isAssignerCompleted()) {
+ // If there are still unprocessed tables or splits, and the
assigner has completed, the
+ // assigner status needs to be reset
+ return new HybridPendingSplitsState(
+ new SnapshotPhaseState(
+
checkpointSnapshotState.getAlreadyProcessedTables(),
+ checkpointSnapshotState.getRemainingSplits(),
+ checkpointSnapshotState.getAssignedSplits(),
+ checkpointSnapshotState.getSplitCompletedOffsets(),
+ false,
+ checkpointSnapshotState.getRemainingTables(),
+ checkpointSnapshotState.isTableIdCaseSensitive(),
+
checkpointSnapshotState.isRemainingTablesCheckpointed()),
+ checkpointState.getIncrementalPhaseState());
+ }
return checkpointState;
}
}