This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b6f7e773b [Bug][seatunnel-translation-base] Fix Source restore state 
NPE (#2878)
b6f7e773b is described below

commit b6f7e773bb0808b23ac0cd4bb0a628c2772f7c5b
Author: hailin0 <[email protected]>
AuthorDate: Mon Sep 26 20:44:19 2022 +0800

    [Bug][seatunnel-translation-base] Fix Source restore state NPE (#2878)
---
 .../seatunnel/translation/source/CoordinatedSource.java     | 13 +++++++++----
 .../apache/seatunnel/translation/source/ParallelSource.java | 13 +++++++++----
 2 files changed, 18 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 3994fc776..2a7dc726a 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -86,7 +86,10 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT extends Ser
 
     private void createSplitEnumerator() throws Exception {
         if (restoredState != null && restoredState.size() > 0) {
-            StateT restoredEnumeratorState = 
enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+            StateT restoredEnumeratorState = null;
+            if (restoredState.containsKey(-1)) {
+                restoredEnumeratorState = 
enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+            }
             splitEnumerator = 
source.restoreEnumerator(coordinatedEnumeratorContext, restoredEnumeratorState);
             restoredState.forEach((subtaskId, splitBytes) -> {
                 if (subtaskId == -1) {
@@ -182,8 +185,6 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT extends Ser
 
     @Override
     public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws 
Exception {
-        StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
-        byte[] enumeratorStateBytes = 
enumeratorStateSerializer.serialize(enumeratorState);
         Map<Integer, List<byte[]>> allStates = readerMap.entrySet()
             .parallelStream()
             .collect(Collectors.toMap(
@@ -200,7 +201,11 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT extends Ser
                         throw new RuntimeException(e);
                     }
                 }));
-        allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
+        StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
+        if (enumeratorState != null) {
+            byte[] enumeratorStateBytes = 
enumeratorStateSerializer.serialize(enumeratorState);
+            allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
+        }
         return allStates;
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 95aa7f18e..147a5350c 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -79,7 +79,10 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT extends Serial
         // Create or restore split enumerator & reader
         try {
             if (restoredState != null && restoredState.size() > 0) {
-                StateT restoredEnumeratorState = 
enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+                StateT restoredEnumeratorState = null;
+                if (restoredState.containsKey(-1)) {
+                    restoredEnumeratorState = 
enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+                }
                 restoredSplitState = new 
ArrayList<>(restoredState.get(subtaskId).size());
                 for (byte[] splitBytes : restoredState.get(subtaskId)) {
                     
restoredSplitState.add(splitSerializer.deserialize(splitBytes));
@@ -180,12 +183,14 @@ public class ParallelSource<T, SplitT extends 
SourceSplit, StateT extends Serial
 
     @Override
     public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws 
Exception {
-        byte[] enumeratorStateBytes = 
enumeratorStateSerializer.serialize(splitEnumerator.snapshotState(checkpointId));
-        List<SplitT> splitStates = reader.snapshotState(checkpointId);
         Map<Integer, List<byte[]>> allStates = new HashMap<>(2);
-        if (enumeratorStateBytes != null) {
+
+        StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
+        if (enumeratorState != null) {
+            byte[] enumeratorStateBytes = 
enumeratorStateSerializer.serialize(enumeratorState);
             allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
         }
+        List<SplitT> splitStates = reader.snapshotState(checkpointId);
         if (splitStates != null) {
             final List<byte[]> readerStateBytes = new 
ArrayList<>(splitStates.size());
             for (SplitT splitState : splitStates) {

Reply via email to