This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8d3f8e4627 [Bugfix][CDC Base] Fix NPE caused by adding a table for 
restore job (#6145)
8d3f8e4627 is described below

commit 8d3f8e46271631903f1c3458801c57d9c53ccf0e
Author: ic4y <[email protected]>
AuthorDate: Tue Jan 9 10:41:18 2024 +0800

    [Bugfix][CDC Base] Fix NPE caused by adding a table for restore job (#6145)
---
 .../connectors/cdc/base/source/IncrementalSource.java  | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 11769ab87a..f62ce75d02 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -311,6 +311,24 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
                     
checkpointSnapshotState.getAssignedSplits().remove(splitId);
                     
checkpointSnapshotState.getSplitCompletedOffsets().remove(splitId);
                 });
+
+        if ((!checkpointSnapshotState.getRemainingTables().isEmpty()
+                        || 
!checkpointSnapshotState.getRemainingSplits().isEmpty())
+                && checkpointSnapshotState.isAssignerCompleted()) {
+            // If there are still unprocessed tables or splits, and the 
assigner has completed, the
+            // assigner status needs to be reset
+            return new HybridPendingSplitsState(
+                    new SnapshotPhaseState(
+                            
checkpointSnapshotState.getAlreadyProcessedTables(),
+                            checkpointSnapshotState.getRemainingSplits(),
+                            checkpointSnapshotState.getAssignedSplits(),
+                            checkpointSnapshotState.getSplitCompletedOffsets(),
+                            false,
+                            checkpointSnapshotState.getRemainingTables(),
+                            checkpointSnapshotState.isTableIdCaseSensitive(),
+                            
checkpointSnapshotState.isRemainingTablesCheckpointed()),
+                    checkpointState.getIncrementalPhaseState());
+        }
         return checkpointState;
     }
 }

Reply via email to