tweise commented on a change in pull request #17111:
URL: https://github.com/apache/flink/pull/17111#discussion_r701426261
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
##########
@@ -74,25 +68,38 @@ private HybridSourceSplit deserializeV0(byte[] serialized)
throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
int sourceIndex = in.readInt();
+ String splitId = in.readUTF();
int nestedVersion = in.readInt();
int length = in.readInt();
byte[] splitBytes = new byte[length];
in.readFully(splitBytes);
- SourceSplit split =
serializerOf(sourceIndex).deserialize(nestedVersion, splitBytes);
- return new HybridSourceSplit(sourceIndex, split);
+ return new HybridSourceSplit(sourceIndex, splitBytes,
nestedVersion, splitId);
}
}
- private SimpleVersionedSerializer<SourceSplit> serializerOf(int
sourceIndex) {
- return cachedSerializers.computeIfAbsent(
- sourceIndex,
- (k -> {
- Source source =
- Preconditions.checkNotNull(
- switchedSources.get(k),
- "Source for index=%s not available",
- sourceIndex);
- return source.getSplitSerializer();
- }));
+ /** Sources that participated in switching with cached serializers. */
+ public static class SwitchedSources implements Serializable {
+ private final Map<Integer, Source> switchedSources = new HashMap<>();
+ private final Map<Integer, SimpleVersionedSerializer<SourceSplit>>
cachedSerializers =
Review comment:
Correct, no longer required after removing from `HybridSource`!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]