This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2984d87 [FLINK-24064][connector/common] HybridSource restore from
savepoint
2984d87 is described below
commit 2984d87ed7761a6ca345b8b79bcb6dd49db0442b
Author: Thomas Weise <[email protected]>
AuthorDate: Tue Aug 31 05:48:40 2021 -0700
[FLINK-24064][connector/common] HybridSource restore from savepoint
---
.../connector/base/source/hybrid/HybridSource.java | 19 ++----
.../source/hybrid/HybridSourceEnumeratorState.java | 17 ++++--
.../HybridSourceEnumeratorStateSerializer.java | 37 ++----------
.../base/source/hybrid/HybridSourceReader.java | 15 ++---
.../base/source/hybrid/HybridSourceSplit.java | 69 ++++++++++++++++------
.../source/hybrid/HybridSourceSplitEnumerator.java | 48 +++++++++------
.../source/hybrid/HybridSourceSplitSerializer.java | 39 +++---------
.../base/source/hybrid/SwitchedSources.java | 48 +++++++++++++++
.../base/source/hybrid/HybridSourceReaderTest.java | 36 ++++-------
.../hybrid/HybridSourceSplitEnumeratorTest.java | 36 ++++++-----
.../hybrid/HybridSourceSplitSerializerTest.java | 6 +-
.../base/source/reader/mocks/MockBaseSource.java | 3 +-
12 files changed, 202 insertions(+), 171 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
index e3d66de..24acb6a 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
@@ -32,9 +32,7 @@ import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
/**
* Hybrid source that switches underlying sources based on configured source
chain.
@@ -91,14 +89,11 @@ import java.util.Map;
public class HybridSource<T> implements Source<T, HybridSourceSplit,
HybridSourceEnumeratorState> {
private final List<SourceListEntry> sources;
- // sources are populated per subtask at switch time
- private final Map<Integer, Source> switchedSources;
/** Protected for subclass, use {@link #builder(Source)} to construct
source. */
protected HybridSource(List<SourceListEntry> sources) {
Preconditions.checkArgument(!sources.isEmpty());
this.sources = sources;
- this.switchedSources = new HashMap<>(sources.size());
}
/** Builder for {@link HybridSource}. */
@@ -116,13 +111,13 @@ public class HybridSource<T> implements Source<T,
HybridSourceSplit, HybridSourc
@Override
public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext
readerContext)
throws Exception {
- return new HybridSourceReader(readerContext, switchedSources);
+ return new HybridSourceReader(readerContext);
}
@Override
public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState>
createEnumerator(
SplitEnumeratorContext<HybridSourceSplit> enumContext) {
- return new HybridSourceSplitEnumerator(enumContext, sources, 0,
switchedSources, null);
+ return new HybridSourceSplitEnumerator(enumContext, sources, 0, null);
}
@Override
@@ -131,22 +126,18 @@ public class HybridSource<T> implements Source<T,
HybridSourceSplit, HybridSourc
HybridSourceEnumeratorState checkpoint)
throws Exception {
return new HybridSourceSplitEnumerator(
- enumContext,
- sources,
- checkpoint.getCurrentSourceIndex(),
- switchedSources,
- checkpoint.getWrappedState());
+ enumContext, sources, checkpoint.getCurrentSourceIndex(),
checkpoint);
}
@Override
public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() {
- return new HybridSourceSplitSerializer(switchedSources);
+ return new HybridSourceSplitSerializer();
}
@Override
public SimpleVersionedSerializer<HybridSourceEnumeratorState>
getEnumeratorCheckpointSerializer() {
- return new HybridSourceEnumeratorStateSerializer(switchedSources);
+ return new HybridSourceEnumeratorStateSerializer();
}
/**
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
index 2da99ee..95aadde 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorState.java
@@ -21,18 +21,25 @@ package org.apache.flink.connector.base.source.hybrid;
/** The state of hybrid source enumerator. */
public class HybridSourceEnumeratorState {
private final int currentSourceIndex;
- private final Object wrappedState;
+ private byte[] wrappedStateBytes;
+ private final int wrappedStateSerializerVersion;
- HybridSourceEnumeratorState(int currentSourceIndex, Object wrappedState) {
+ HybridSourceEnumeratorState(
+ int currentSourceIndex, byte[] wrappedStateBytes, int
serializerVersion) {
this.currentSourceIndex = currentSourceIndex;
- this.wrappedState = wrappedState;
+ this.wrappedStateBytes = wrappedStateBytes;
+ this.wrappedStateSerializerVersion = serializerVersion;
}
public int getCurrentSourceIndex() {
return this.currentSourceIndex;
}
- public Object getWrappedState() {
- return wrappedState;
+ public byte[] getWrappedState() {
+ return wrappedStateBytes;
+ }
+
+ public int getWrappedStateSerializerVersion() {
+ return wrappedStateSerializerVersion;
}
}
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
index 721e010..92c021e 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
@@ -18,17 +18,13 @@
package org.apache.flink.connector.base.source.hybrid;
-import org.apache.flink.api.connector.source.Source;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.util.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
/** The {@link SimpleVersionedSerializer Serializer} for the enumerator state.
*/
public class HybridSourceEnumeratorStateSerializer
@@ -36,13 +32,7 @@ public class HybridSourceEnumeratorStateSerializer
private static final int CURRENT_VERSION = 0;
- private final Map<Integer, SimpleVersionedSerializer<Object>>
cachedSerializers;
- private final Map<Integer, Source> switchedSources;
-
- public HybridSourceEnumeratorStateSerializer(Map<Integer, Source>
switchedSources) {
- this.switchedSources = switchedSources;
- this.cachedSerializers = new HashMap<>();
- }
+ public HybridSourceEnumeratorStateSerializer() {}
@Override
public int getVersion() {
@@ -54,12 +44,9 @@ public class HybridSourceEnumeratorStateSerializer
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
out.writeInt(enumState.getCurrentSourceIndex());
- SimpleVersionedSerializer<Object> serializer =
- serializerOf(enumState.getCurrentSourceIndex());
- out.writeInt(serializer.getVersion());
- byte[] enumStateBytes =
serializer.serialize(enumState.getWrappedState());
- out.writeInt(enumStateBytes.length);
- out.write(enumStateBytes);
+ out.writeInt(enumState.getWrappedStateSerializerVersion());
+ out.writeInt(enumState.getWrappedState().length);
+ out.write(enumState.getWrappedState());
out.flush();
return baos.toByteArray();
}
@@ -86,21 +73,7 @@ public class HybridSourceEnumeratorStateSerializer
int length = in.readInt();
byte[] nestedBytes = new byte[length];
in.readFully(nestedBytes);
- Object nested =
serializerOf(sourceIndex).deserialize(nestedVersion, nestedBytes);
- return new HybridSourceEnumeratorState(sourceIndex, nested);
+ return new HybridSourceEnumeratorState(sourceIndex, nestedBytes,
nestedVersion);
}
}
-
- private SimpleVersionedSerializer<Object> serializerOf(int sourceIndex) {
- return cachedSerializers.computeIfAbsent(
- sourceIndex,
- (k -> {
- Source source =
- Preconditions.checkNotNull(
- switchedSources.get(k),
- "Source for index=%s not available",
- sourceIndex);
- return source.getEnumeratorCheckpointSerializer();
- }));
- }
}
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
index 28d4011..855f85d 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
@@ -34,7 +34,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
@@ -55,17 +54,15 @@ import java.util.concurrent.CompletableFuture;
public class HybridSourceReader<T> implements SourceReader<T,
HybridSourceSplit> {
private static final Logger LOG =
LoggerFactory.getLogger(HybridSourceReader.class);
private final SourceReaderContext readerContext;
- private final Map<Integer, Source> switchedSources;
+ private final SwitchedSources switchedSources = new SwitchedSources();
private int currentSourceIndex = -1;
private boolean isFinalSource;
private SourceReader<T, ? extends SourceSplit> currentReader;
private CompletableFuture<Void> availabilityFuture = new
CompletableFuture<>();
private List<HybridSourceSplit> restoredSplits = new ArrayList<>();
- public HybridSourceReader(
- SourceReaderContext readerContext, Map<Integer, Source>
switchedSources) {
+ public HybridSourceReader(SourceReaderContext readerContext) {
this.readerContext = readerContext;
- this.switchedSources = switchedSources;
}
@Override
@@ -117,7 +114,7 @@ public class HybridSourceReader<T> implements
SourceReader<T, HybridSourceSplit>
currentReader != null
? currentReader.snapshotState(checkpointId)
: Collections.emptyList();
- return HybridSourceSplit.wrapSplits(currentSourceIndex, state);
+ return HybridSourceSplit.wrapSplits(state, currentSourceIndex,
switchedSources);
}
@Override
@@ -158,7 +155,7 @@ public class HybridSourceReader<T> implements
SourceReader<T, HybridSourceSplit>
"Split %s while current source is %s",
split,
currentSourceIndex);
- realSplits.add(split.getWrappedSplit());
+ realSplits.add(HybridSourceSplit.unwrapSplit(split,
switchedSources));
}
currentReader.addSplits((List) realSplits);
}
@@ -224,9 +221,7 @@ public class HybridSourceReader<T> implements
SourceReader<T, HybridSourceSplit>
currentReader);
}
// TODO: track previous readers splits till checkpoint
- Source source =
- Preconditions.checkNotNull(
- switchedSources.get(index), "Source for index=%s not
available", index);
+ Source source = switchedSources.sourceOf(index);
SourceReader<T, ?> reader;
try {
reader = source.createReader(readerContext);
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java
index 9057aa6..f26bd10 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java
@@ -19,33 +19,45 @@
package org.apache.flink.connector.base.source.hybrid;
import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
/** Source split that wraps the actual split type. */
public class HybridSourceSplit implements SourceSplit {
- private final SourceSplit wrappedSplit;
+ private final byte[] wrappedSplitBytes;
+ private final int wrappedSplitSerializerVersion;
private final int sourceIndex;
+ private final String splitId;
- public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) {
+ public HybridSourceSplit(
+ int sourceIndex, byte[] wrappedSplit, int serializerVersion,
String splitId) {
this.sourceIndex = sourceIndex;
- this.wrappedSplit = wrappedSplit;
+ this.wrappedSplitBytes = wrappedSplit;
+ this.wrappedSplitSerializerVersion = serializerVersion;
+ this.splitId = splitId;
}
public int sourceIndex() {
return this.sourceIndex;
}
- public SourceSplit getWrappedSplit() {
- return wrappedSplit;
+ public byte[] wrappedSplitBytes() {
+ return wrappedSplitBytes;
+ }
+
+ public int wrappedSplitSerializerVersion() {
+ return wrappedSplitSerializerVersion;
}
@Override
public String splitId() {
- return wrappedSplit.splitId();
+ return splitId;
}
@Override
@@ -57,38 +69,59 @@ public class HybridSourceSplit implements SourceSplit {
return false;
}
HybridSourceSplit that = (HybridSourceSplit) o;
- return sourceIndex == that.sourceIndex &&
wrappedSplit.equals(that.wrappedSplit);
+ return sourceIndex == that.sourceIndex
+ && Arrays.equals(wrappedSplitBytes, that.wrappedSplitBytes);
}
@Override
public int hashCode() {
- return Objects.hash(wrappedSplit, sourceIndex);
+ return Objects.hash(wrappedSplitBytes, sourceIndex);
}
@Override
public String toString() {
- return "HybridSourceSplit{"
- + "realSplit="
- + wrappedSplit
- + ", sourceIndex="
- + sourceIndex
- + '}';
+ return "HybridSourceSplit{" + "sourceIndex=" + sourceIndex + ",
splitId=" + splitId + '}';
}
public static List<HybridSourceSplit> wrapSplits(
- int readerIndex, List<? extends SourceSplit> state) {
+ List<? extends SourceSplit> state, int readerIndex,
SwitchedSources switchedSources) {
List<HybridSourceSplit> wrappedSplits = new ArrayList<>(state.size());
for (SourceSplit split : state) {
- wrappedSplits.add(new HybridSourceSplit(readerIndex, split));
+ wrappedSplits.add(wrapSplit(split, readerIndex, switchedSources));
}
return wrappedSplits;
}
- public static List<SourceSplit> unwrapSplits(List<HybridSourceSplit>
splits) {
+ public static HybridSourceSplit wrapSplit(
+ SourceSplit split, int sourceIndex, SwitchedSources
switchedSources) {
+ try {
+ SimpleVersionedSerializer<SourceSplit> serializer =
+ switchedSources.serializerOf(sourceIndex);
+ byte[] serialized = serializer.serialize(split);
+ return new HybridSourceSplit(
+ sourceIndex, serialized, serializer.getVersion(),
split.splitId());
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public static List<SourceSplit> unwrapSplits(
+ List<HybridSourceSplit> splits, SwitchedSources switchedSources) {
List<SourceSplit> unwrappedSplits = new ArrayList<>(splits.size());
for (HybridSourceSplit split : splits) {
- unwrappedSplits.add(split.getWrappedSplit());
+ unwrappedSplits.add(unwrapSplit(split, switchedSources));
}
return unwrappedSplits;
}
+
+ public static SourceSplit unwrapSplit(
+ HybridSourceSplit split, SwitchedSources switchedSources) {
+ try {
+ return switchedSources
+ .serializerOf(split.sourceIndex())
+ .deserialize(split.wrappedSplitSerializerVersion(),
split.wrappedSplitBytes());
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
index 0f2b036..d27de22 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.util.Preconditions;
@@ -68,21 +69,21 @@ public class HybridSourceSplitEnumerator
private final SplitEnumeratorContext<HybridSourceSplit> context;
private final List<HybridSource.SourceListEntry> sources;
- private final Map<Integer, Source> switchedSources;
+ private final SwitchedSources switchedSources = new SwitchedSources();
// Splits that have been returned due to subtask reset
private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit>>>
pendingSplits;
private final Set<Integer> finishedReaders;
private final Map<Integer, Integer> readerSourceIndex;
private int currentSourceIndex;
- private Object restoredEnumeratorState;
+ private HybridSourceEnumeratorState restoredEnumeratorState;
private SplitEnumerator<SourceSplit, Object> currentEnumerator;
+ private SimpleVersionedSerializer<Object>
currentEnumeratorCheckpointSerializer;
public HybridSourceSplitEnumerator(
SplitEnumeratorContext<HybridSourceSplit> context,
List<HybridSource.SourceListEntry> sources,
int initialSourceIndex,
- Map<Integer, Source> switchedSources,
- Object restoredEnumeratorState) {
+ HybridSourceEnumeratorState restoredEnumeratorState) {
Preconditions.checkArgument(initialSourceIndex < sources.size());
this.context = context;
this.sources = sources;
@@ -90,7 +91,6 @@ public class HybridSourceSplitEnumerator
this.pendingSplits = new HashMap<>();
this.finishedReaders = new HashSet<>();
this.readerSourceIndex = new HashMap<>();
- this.switchedSources = switchedSources;
this.restoredEnumeratorState = restoredEnumeratorState;
}
@@ -127,7 +127,8 @@ public class HybridSourceSplitEnumerator
(k, splitsPerSource) -> {
if (k == currentSourceIndex) {
currentEnumerator.addSplitsBack(
-
HybridSourceSplit.unwrapSplits(splitsPerSource), subtaskId);
+
HybridSourceSplit.unwrapSplits(splitsPerSource, switchedSources),
+ subtaskId);
} else {
pendingSplits
.computeIfAbsent(subtaskId, sourceIndex -> new
TreeMap<>())
@@ -144,7 +145,7 @@ public class HybridSourceSplitEnumerator
private void sendSwitchSourceEvent(int subtaskId, int sourceIndex) {
readerSourceIndex.put(subtaskId, sourceIndex);
- Source source =
Preconditions.checkNotNull(switchedSources.get(sourceIndex));
+ Source source = switchedSources.sourceOf(sourceIndex);
context.sendEventToSourceReader(
subtaskId,
new SwitchSourceEvent(sourceIndex, source, sourceIndex >=
(sources.size() - 1)));
@@ -172,7 +173,11 @@ public class HybridSourceSplitEnumerator
@Override
public HybridSourceEnumeratorState snapshotState(long checkpointId) throws
Exception {
Object enumState = currentEnumerator.snapshotState(checkpointId);
- return new HybridSourceEnumeratorState(currentSourceIndex, enumState);
+ byte[] enumStateBytes =
currentEnumeratorCheckpointSerializer.serialize(enumState);
+ return new HybridSourceEnumeratorState(
+ currentSourceIndex,
+ enumStateBytes,
+ currentEnumeratorCheckpointSerializer.getVersion());
}
@Override
@@ -262,21 +267,22 @@ public class HybridSourceSplitEnumerator
};
Source<?, ? extends SourceSplit, Object> source =
- switchedSources.computeIfAbsent(
- currentSourceIndex,
- k -> {
- return
sources.get(currentSourceIndex).factory.create(switchContext);
- });
+ sources.get(currentSourceIndex).factory.create(switchContext);
switchedSources.put(currentSourceIndex, source);
+ currentEnumeratorCheckpointSerializer =
source.getEnumeratorCheckpointSerializer();
SplitEnumeratorContextProxy delegatingContext =
- new SplitEnumeratorContextProxy(currentSourceIndex, context,
readerSourceIndex);
+ new SplitEnumeratorContextProxy(
+ currentSourceIndex, context, readerSourceIndex,
switchedSources);
try {
if (restoredEnumeratorState == null) {
currentEnumerator = source.createEnumerator(delegatingContext);
} else {
LOG.info("Restoring enumerator for sourceIndex={}",
currentSourceIndex);
- currentEnumerator =
- source.restoreEnumerator(delegatingContext,
restoredEnumeratorState);
+ Object nestedEnumState =
+ currentEnumeratorCheckpointSerializer.deserialize(
+
restoredEnumeratorState.getWrappedStateSerializerVersion(),
+ restoredEnumeratorState.getWrappedState());
+ currentEnumerator =
source.restoreEnumerator(delegatingContext, nestedEnumState);
restoredEnumeratorState = null;
}
} catch (Exception e) {
@@ -301,14 +307,17 @@ public class HybridSourceSplitEnumerator
private final SplitEnumeratorContext<HybridSourceSplit> realContext;
private final int sourceIndex;
private final Map<Integer, Integer> readerSourceIndex;
+ private final SwitchedSources switchedSources;
private SplitEnumeratorContextProxy(
int sourceIndex,
SplitEnumeratorContext<HybridSourceSplit> realContext,
- Map<Integer, Integer> readerSourceIndex) {
+ Map<Integer, Integer> readerSourceIndex,
+ SwitchedSources switchedSources) {
this.realContext = realContext;
this.sourceIndex = sourceIndex;
this.readerSourceIndex = readerSourceIndex;
+ this.switchedSources = switchedSources;
}
@Override
@@ -358,7 +367,7 @@ public class HybridSourceSplitEnumerator
Map<Integer, List<HybridSourceSplit>> wrappedAssignmentMap = new
HashMap<>();
for (Map.Entry<Integer, List<SplitT>> e :
newSplitAssignments.assignment().entrySet()) {
List<HybridSourceSplit> splits =
- HybridSourceSplit.wrapSplits(sourceIndex,
e.getValue());
+ HybridSourceSplit.wrapSplits(e.getValue(),
sourceIndex, switchedSources);
wrappedAssignmentMap.put(e.getKey(), splits);
}
SplitsAssignment<HybridSourceSplit> wrappedAssignments =
@@ -369,7 +378,8 @@ public class HybridSourceSplitEnumerator
@Override
public void assignSplit(SplitT split, int subtask) {
- HybridSourceSplit wrappedSplit = new
HybridSourceSplit(sourceIndex, split);
+ HybridSourceSplit wrappedSplit =
+ HybridSourceSplit.wrapSplit(split, sourceIndex,
switchedSources);
realContext.assignSplit(wrappedSplit, subtask);
}
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
index 025733c..8fe7b7c 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
@@ -18,29 +18,18 @@
package org.apache.flink.connector.base.source.hybrid;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.util.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
/** Serializes splits by delegating to the source-indexed underlying split
serializer. */
public class HybridSourceSplitSerializer implements
SimpleVersionedSerializer<HybridSourceSplit> {
- final Map<Integer, SimpleVersionedSerializer<SourceSplit>>
cachedSerializers;
- final Map<Integer, Source> switchedSources;
-
- public HybridSourceSplitSerializer(Map<Integer, Source> switchedSources) {
- this.cachedSerializers = new HashMap<>();
- this.switchedSources = switchedSources;
- }
+ public HybridSourceSplitSerializer() {}
@Override
public int getVersion() {
@@ -52,11 +41,10 @@ public class HybridSourceSplitSerializer implements
SimpleVersionedSerializer<Hy
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
out.writeInt(split.sourceIndex());
- out.writeInt(serializerOf(split.sourceIndex()).getVersion());
- byte[] serializedSplit =
-
serializerOf(split.sourceIndex()).serialize(split.getWrappedSplit());
- out.writeInt(serializedSplit.length);
- out.write(serializedSplit);
+ out.writeUTF(split.splitId());
+ out.writeInt(split.wrappedSplitSerializerVersion());
+ out.writeInt(split.wrappedSplitBytes().length);
+ out.write(split.wrappedSplitBytes());
out.flush();
return baos.toByteArray();
}
@@ -74,25 +62,12 @@ public class HybridSourceSplitSerializer implements
SimpleVersionedSerializer<Hy
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
int sourceIndex = in.readInt();
+ String splitId = in.readUTF();
int nestedVersion = in.readInt();
int length = in.readInt();
byte[] splitBytes = new byte[length];
in.readFully(splitBytes);
- SourceSplit split =
serializerOf(sourceIndex).deserialize(nestedVersion, splitBytes);
- return new HybridSourceSplit(sourceIndex, split);
+ return new HybridSourceSplit(sourceIndex, splitBytes,
nestedVersion, splitId);
}
}
-
- private SimpleVersionedSerializer<SourceSplit> serializerOf(int
sourceIndex) {
- return cachedSerializers.computeIfAbsent(
- sourceIndex,
- (k -> {
- Source source =
- Preconditions.checkNotNull(
- switchedSources.get(k),
- "Source for index=%s not available",
- sourceIndex);
- return source.getSplitSerializer();
- }));
- }
}
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
new file mode 100644
index 0000000..7911612
--- /dev/null
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchedSources.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Sources that participated in switching with cached serializers. */
+class SwitchedSources {
+ private final Map<Integer, Source> sources = new HashMap<>();
+ private final Map<Integer, SimpleVersionedSerializer<SourceSplit>>
cachedSerializers =
+ new HashMap<>();
+
+ public Source sourceOf(int sourceIndex) {
+ return Preconditions.checkNotNull(
+ sources.get(sourceIndex), "Source for index=%s not available",
sourceIndex);
+ }
+
+ public SimpleVersionedSerializer<SourceSplit> serializerOf(int
sourceIndex) {
+ return cachedSerializers.computeIfAbsent(
+ sourceIndex, (k -> sourceOf(k).getSplitSerializer()));
+ }
+
+ public void put(int sourceIndex, Source source) {
+ sources.put(sourceIndex, Preconditions.checkNotNull(source));
+ }
+}
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
index 7882333..031a735 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
@@ -36,9 +36,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
/** Tests for {@link HybridSourceReader}. */
public class HybridSourceReaderTest {
@@ -55,10 +53,7 @@ public class HybridSourceReaderTest {
SourceReader<Integer, MockSourceSplit> mockSplitReader2 =
source.createReader(readerContext);
- Map<Integer, Source> switchedSources = new HashMap<>();
-
- HybridSourceReader<Integer> reader =
- new HybridSourceReader<>(readerContext, switchedSources);
+ HybridSourceReader<Integer> reader = new
HybridSourceReader<>(readerContext);
Assert.assertThat(readerContext.getSentEvents(),
Matchers.emptyIterable());
reader.start();
@@ -75,10 +70,13 @@ public class HybridSourceReaderTest {
}
};
reader.handleSourceEvents(new SwitchSourceEvent(0, source1, false));
- Assert.assertEquals(source1, switchedSources.get(0));
+
MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 1);
mockSplit.addRecord(0);
- HybridSourceSplit hybridSplit = new HybridSourceSplit(0, mockSplit);
+
+ SwitchedSources switchedSources = new SwitchedSources();
+ switchedSources.put(0, source);
+ HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit(mockSplit,
0, switchedSources);
reader.addSplits(Collections.singletonList(hybridSplit));
// drain splits
@@ -128,19 +126,17 @@ public class HybridSourceReaderTest {
TestingReaderOutput<Integer> readerOutput = new
TestingReaderOutput<>();
MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED);
- Map<Integer, Source> switchedSources = new HashMap<>();
-
- HybridSourceReader<Integer> reader =
- new HybridSourceReader<>(readerContext, switchedSources);
+ HybridSourceReader<Integer> reader = new
HybridSourceReader<>(readerContext);
reader.start();
assertAndClearSourceReaderFinishedEvent(readerContext, -1);
reader.handleSourceEvents(new SwitchSourceEvent(0, source, false));
- Assert.assertEquals(source, switchedSources.get(0));
MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 2147483647);
- // mockSplit.addRecord(0);
- HybridSourceSplit hybridSplit = new HybridSourceSplit(0, mockSplit);
+
+ SwitchedSources switchedSources = new SwitchedSources();
+ switchedSources.put(0, source);
+ HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit(mockSplit,
0, switchedSources);
reader.addSplits(Collections.singletonList(hybridSplit));
List<HybridSourceSplit> snapshot = reader.snapshotState(0);
@@ -148,8 +144,7 @@ public class HybridSourceReaderTest {
// reader recovery
readerContext.clearSentEvents();
- switchedSources = new HashMap<>();
- reader = new HybridSourceReader<>(readerContext, switchedSources);
+ reader = new HybridSourceReader<>(readerContext);
reader.addSplits(snapshot);
Assert.assertNull(currentReader(reader));
@@ -160,7 +155,6 @@ public class HybridSourceReaderTest {
assertAndClearSourceReaderFinishedEvent(readerContext, -1);
reader.handleSourceEvents(new SwitchSourceEvent(0, source, false));
Assert.assertNotNull(currentReader(reader));
- Assert.assertEquals(source, switchedSources.get(0));
Assert.assertThat(reader.snapshotState(1),
Matchers.contains(hybridSplit));
reader.close();
@@ -179,15 +173,11 @@ public class HybridSourceReaderTest {
}
};
- Map<Integer, Source> switchedSources = new HashMap<>();
-
- HybridSourceReader<Integer> reader =
- new HybridSourceReader<>(readerContext, switchedSources);
+ HybridSourceReader<Integer> reader = new
HybridSourceReader<>(readerContext);
reader.start();
assertAndClearSourceReaderFinishedEvent(readerContext, -1);
reader.handleSourceEvents(new SwitchSourceEvent(0, source, false));
- Assert.assertEquals(source, switchedSources.get(0));
SourceReader<Integer, MockSourceSplit> underlyingReader =
currentReader(reader);
reader.notifyCheckpointComplete(1);
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
index 59a1836..7bcf69c 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
@@ -47,6 +47,7 @@ public class HybridSourceSplitEnumeratorTest {
private static final int SUBTASK0 = 0;
private static final int SUBTASK1 = 1;
+ private static final MockBaseSource MOCK_SOURCE = new MockBaseSource(1, 1,
Boundedness.BOUNDED);
private HybridSource<Integer> source;
private MockSplitEnumeratorContext<HybridSourceSplit> context;
@@ -56,10 +57,7 @@ public class HybridSourceSplitEnumeratorTest {
private void setupEnumeratorAndTriggerSourceSwitch() {
context = new MockSplitEnumeratorContext<>(2);
- source =
- HybridSource.builder(new MockBaseSource(1, 1,
Boundedness.BOUNDED))
- .addSource(new MockBaseSource(1, 1,
Boundedness.BOUNDED))
- .build();
+ source =
HybridSource.builder(MOCK_SOURCE).addSource(MOCK_SOURCE).build();
enumerator = (HybridSourceSplitEnumerator)
source.createEnumerator(context);
enumerator.start();
@@ -130,9 +128,7 @@ public class HybridSourceSplitEnumeratorTest {
setupEnumeratorAndTriggerSourceSwitch();
UnderlyingEnumeratorWrapper underlyingEnumeratorWrapper =
- new UnderlyingEnumeratorWrapper(
- (MockSplitEnumerator)
- Whitebox.getInternalState(enumerator,
"currentEnumerator"));
+ new
UnderlyingEnumeratorWrapper(getCurrentEnumerator(enumerator));
Whitebox.setInternalState(enumerator, "currentEnumerator",
underlyingEnumeratorWrapper);
List<MockSourceSplit> mockSourceSplits =
@@ -147,11 +143,15 @@ public class HybridSourceSplitEnumeratorTest {
assertThat(underlyingEnumeratorWrapper.handleSplitRequests,
Matchers.emptyIterable());
enumerator.handleSplitRequest(SUBTASK0, "fakehostname");
+ SwitchedSources switchedSources = new SwitchedSources();
+ switchedSources.put(1, MOCK_SOURCE);
+
assertSplitAssignment(
"handleSplitRequest triggers assignment of split by underlying
enumerator",
context,
1,
- new HybridSourceSplit(1, UnderlyingEnumeratorWrapper.SPLIT_1),
+ HybridSourceSplit.wrapSplit(
+ UnderlyingEnumeratorWrapper.SPLIT_1, 1,
switchedSources),
SUBTASK0);
// handleSplitRequest invalid during reset
@@ -169,21 +169,24 @@ public class HybridSourceSplitEnumeratorTest {
enumerator = (HybridSourceSplitEnumerator)
source.createEnumerator(context);
enumerator.start();
HybridSourceEnumeratorState enumeratorState =
enumerator.snapshotState(0);
- Assert.assertEquals(1, ((List)
enumeratorState.getWrappedState()).size());
+ MockSplitEnumerator underlyingEnumerator =
getCurrentEnumerator(enumerator);
+ Assert.assertThat(
+ (List<MockSourceSplit>)
Whitebox.getInternalState(underlyingEnumerator, "splits"),
+ Matchers.iterableWithSize(1));
enumerator =
(HybridSourceSplitEnumerator)
source.restoreEnumerator(context, enumeratorState);
enumerator.start();
- enumeratorState = enumerator.snapshotState(0);
- Assert.assertEquals(1, ((List)
enumeratorState.getWrappedState()).size());
+ underlyingEnumerator = getCurrentEnumerator(enumerator);
+ Assert.assertThat(
+ (List<MockSourceSplit>)
Whitebox.getInternalState(underlyingEnumerator, "splits"),
+ Matchers.iterableWithSize(1));
}
@Test
public void testDefaultMethodDelegation() throws Exception {
setupEnumeratorAndTriggerSourceSwitch();
SplitEnumerator<MockSourceSplit, Object> underlyingEnumeratorSpy =
- Mockito.spy(
- (SplitEnumerator<MockSourceSplit, Object>)
- Whitebox.getInternalState(enumerator,
"currentEnumerator"));
+ Mockito.spy((SplitEnumerator)
getCurrentEnumerator(enumerator));
Whitebox.setInternalState(enumerator, "currentEnumerator",
underlyingEnumeratorSpy);
enumerator.notifyCheckpointComplete(1);
@@ -270,4 +273,9 @@ public class HybridSourceSplitEnumeratorTest {
private static int getCurrentSourceIndex(HybridSourceSplitEnumerator
enumerator) {
return (int) Whitebox.getInternalState(enumerator,
"currentSourceIndex");
}
+
+ private static MockSplitEnumerator getCurrentEnumerator(
+ HybridSourceSplitEnumerator enumerator) {
+ return (MockSplitEnumerator) Whitebox.getInternalState(enumerator,
"currentEnumerator");
+ }
}
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java
index d43275f..e2db86e 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializerTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.base.source.hybrid;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;
-import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.junit.Assert;
import org.junit.Test;
@@ -36,8 +35,9 @@ public class HybridSourceSplitSerializerTest {
public void testSerialization() throws Exception {
Map<Integer, Source> switchedSources = new HashMap<>();
switchedSources.put(0, new MockSource(null, 0));
- HybridSourceSplitSerializer serializer = new
HybridSourceSplitSerializer(switchedSources);
- HybridSourceSplit split = new HybridSourceSplit(0, new
MockSourceSplit(1));
+ byte[] splitBytes = {1, 2, 3};
+ HybridSourceSplitSerializer serializer = new
HybridSourceSplitSerializer();
+ HybridSourceSplit split = new HybridSourceSplit(0, splitBytes, 0,
"splitId");
byte[] serialized = serializer.serialize(split);
HybridSourceSplit clonedSplit = serializer.deserialize(0, serialized);
Assert.assertEquals(split, clonedSplit);
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
index bd01adf..ff3e80e 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
@@ -117,7 +117,8 @@ public class MockBaseSource implements Source<Integer,
MockSourceSplit, List<Moc
@Override
public byte[] serialize(List<MockSourceSplit> obj) throws
IOException {
- return InstantiationUtil.serializeObject(obj.toArray());
+ return InstantiationUtil.serializeObject(
+ obj.toArray(new MockSourceSplit[obj.size()]));
}
@Override