This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new e5570e3e33a [FLINK-28817][connector/common] NullPointerException in
HybridSource when restoring from checkpoint
e5570e3e33a is described below
commit e5570e3e33ac33fd1b31d38c86ac6a291e7bc47e
Author: Qishang Zhong <[email protected]>
AuthorDate: Sat Aug 13 11:21:58 2022 +0800
[FLINK-28817][connector/common] NullPointerException in HybridSource when
restoring from checkpoint
---
.../source/hybrid/HybridSourceSplitEnumerator.java | 6 +++++-
.../connector/base/source/hybrid/SwitchedSources.java | 8 +++++++-
.../hybrid/HybridSourceSplitEnumeratorTest.java | 19 +++++++++++++++++++
3 files changed, 31 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
index d27de221af8..61baabeb941 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -218,7 +218,11 @@ public class HybridSourceSplitEnumerator
}
if (subtaskSourceIndex < currentSourceIndex) {
- subtaskSourceIndex++;
+ // find initial or next index for the reader
+ subtaskSourceIndex =
+ subtaskSourceIndex == -1
+ ? switchedSources.getFirstSourceIndex()
+ : ++subtaskSourceIndex;
sendSwitchSourceEvent(subtaskId, subtaskSourceIndex);
return;
}
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
index 7911612d258..68128d1faed 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
@@ -25,10 +25,12 @@ import org.apache.flink.util.Preconditions;
import java.util.HashMap;
import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
/** Sources that participated in switching with cached serializers. */
class SwitchedSources {
- private final Map<Integer, Source> sources = new HashMap<>();
+ private final SortedMap<Integer, Source> sources = new TreeMap<>();
private final Map<Integer, SimpleVersionedSerializer<SourceSplit>>
cachedSerializers =
new HashMap<>();
@@ -45,4 +47,8 @@ class SwitchedSources {
public void put(int sourceIndex, Source source) {
sources.put(sourceIndex, Preconditions.checkNotNull(source));
}
+
+ public int getFirstSourceIndex() {
+ return sources.firstKey();
+ }
}
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
index 7bcf69c5e72..8b5cb096586 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
@@ -182,6 +182,25 @@ public class HybridSourceSplitEnumeratorTest {
Matchers.iterableWithSize(1));
}
+ @Test
+ public void testRestoreEnumeratorAfterFirstSourceWithoutRestoredSplits()
throws Exception {
+ setupEnumeratorAndTriggerSourceSwitch();
+ HybridSourceEnumeratorState enumeratorState =
enumerator.snapshotState(0);
+ MockSplitEnumerator underlyingEnumerator =
getCurrentEnumerator(enumerator);
+ Assert.assertThat(
+ (List<MockSourceSplit>)
Whitebox.getInternalState(underlyingEnumerator, "splits"),
+ Matchers.iterableWithSize(0));
+ enumerator =
+ (HybridSourceSplitEnumerator)
source.restoreEnumerator(context, enumeratorState);
+ enumerator.start();
+ // subtask starts at -1 since it has no splits after restore
+ enumerator.handleSourceEvent(SUBTASK0, new
SourceReaderFinishedEvent(-1));
+ underlyingEnumerator = getCurrentEnumerator(enumerator);
+ Assert.assertThat(
+ (List<MockSourceSplit>)
Whitebox.getInternalState(underlyingEnumerator, "splits"),
+ Matchers.iterableWithSize(0));
+ }
+
@Test
public void testDefaultMethodDelegation() throws Exception {
setupEnumeratorAndTriggerSourceSwitch();