This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b6f7e773b [Bug][seatunnel-translation-base] Fix Source restore state
NPE (#2878)
b6f7e773b is described below
commit b6f7e773bb0808b23ac0cd4bb0a628c2772f7c5b
Author: hailin0 <[email protected]>
AuthorDate: Mon Sep 26 20:44:19 2022 +0800
[Bug][seatunnel-translation-base] Fix Source restore state NPE (#2878)
---
.../seatunnel/translation/source/CoordinatedSource.java | 13 +++++++++----
.../apache/seatunnel/translation/source/ParallelSource.java | 13 +++++++++----
2 files changed, 18 insertions(+), 8 deletions(-)
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 3994fc776..2a7dc726a 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -86,7 +86,10 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT extends Ser
private void createSplitEnumerator() throws Exception {
if (restoredState != null && restoredState.size() > 0) {
- StateT restoredEnumeratorState =
enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+ StateT restoredEnumeratorState = null;
+ if (restoredState.containsKey(-1)) {
+ restoredEnumeratorState =
enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+ }
splitEnumerator =
source.restoreEnumerator(coordinatedEnumeratorContext, restoredEnumeratorState);
restoredState.forEach((subtaskId, splitBytes) -> {
if (subtaskId == -1) {
@@ -182,8 +185,6 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT extends Ser
@Override
public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws
Exception {
- StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
- byte[] enumeratorStateBytes =
enumeratorStateSerializer.serialize(enumeratorState);
Map<Integer, List<byte[]>> allStates = readerMap.entrySet()
.parallelStream()
.collect(Collectors.toMap(
@@ -200,7 +201,11 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT extends Ser
throw new RuntimeException(e);
}
}));
- allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
+ StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
+ if (enumeratorState != null) {
+ byte[] enumeratorStateBytes =
enumeratorStateSerializer.serialize(enumeratorState);
+ allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
+ }
return allStates;
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 95aa7f18e..147a5350c 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -79,7 +79,10 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT extends Serial
// Create or restore split enumerator & reader
try {
if (restoredState != null && restoredState.size() > 0) {
- StateT restoredEnumeratorState =
enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+ StateT restoredEnumeratorState = null;
+ if (restoredState.containsKey(-1)) {
+ restoredEnumeratorState =
enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+ }
restoredSplitState = new
ArrayList<>(restoredState.get(subtaskId).size());
for (byte[] splitBytes : restoredState.get(subtaskId)) {
restoredSplitState.add(splitSerializer.deserialize(splitBytes));
@@ -180,12 +183,14 @@ public class ParallelSource<T, SplitT extends
SourceSplit, StateT extends Serial
@Override
public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws
Exception {
- byte[] enumeratorStateBytes =
enumeratorStateSerializer.serialize(splitEnumerator.snapshotState(checkpointId));
- List<SplitT> splitStates = reader.snapshotState(checkpointId);
Map<Integer, List<byte[]>> allStates = new HashMap<>(2);
- if (enumeratorStateBytes != null) {
+
+ StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
+ if (enumeratorState != null) {
+ byte[] enumeratorStateBytes =
enumeratorStateSerializer.serialize(enumeratorState);
allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
}
+ List<SplitT> splitStates = reader.snapshotState(checkpointId);
if (splitStates != null) {
final List<byte[]> readerStateBytes = new
ArrayList<>(splitStates.size());
for (SplitT splitState : splitStates) {