AHeise commented on a change in pull request #17111:
URL: https://github.com/apache/flink/pull/17111#discussion_r701310717
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
##########
@@ -18,31 +18,21 @@
package org.apache.flink.connector.base.source.hybrid;
-import org.apache.flink.api.connector.source.Source;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.util.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
/** The {@link SimpleVersionedSerializer Serializer} for the enumerator state.
*/
public class HybridSourceEnumeratorStateSerializer
implements SimpleVersionedSerializer<HybridSourceEnumeratorState> {
private static final int CURRENT_VERSION = 0;
- private final Map<Integer, SimpleVersionedSerializer<Object>>
cachedSerializers;
- private final Map<Integer, Source> switchedSources;
-
- public HybridSourceEnumeratorStateSerializer(Map<Integer, Source>
switchedSources) {
- this.switchedSources = switchedSources;
- this.cachedSerializers = new HashMap<>();
- }
+ public HybridSourceEnumeratorStateSerializer() {}
Review comment:
Much cleaner now!
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
##########
@@ -74,25 +68,38 @@ private HybridSourceSplit deserializeV0(byte[] serialized)
throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
int sourceIndex = in.readInt();
+ String splitId = in.readUTF();
int nestedVersion = in.readInt();
int length = in.readInt();
byte[] splitBytes = new byte[length];
in.readFully(splitBytes);
- SourceSplit split =
serializerOf(sourceIndex).deserialize(nestedVersion, splitBytes);
- return new HybridSourceSplit(sourceIndex, split);
+ return new HybridSourceSplit(sourceIndex, splitBytes,
nestedVersion, splitId);
}
}
- private SimpleVersionedSerializer<SourceSplit> serializerOf(int
sourceIndex) {
- return cachedSerializers.computeIfAbsent(
- sourceIndex,
- (k -> {
- Source source =
- Preconditions.checkNotNull(
- switchedSources.get(k),
- "Source for index=%s not available",
- sourceIndex);
- return source.getSplitSerializer();
- }));
+ /** Sources that participated in switching with cached serializers. */
+ public static class SwitchedSources implements Serializable {
Review comment:
I don't see which this is a subclass here.
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java
##########
@@ -147,11 +147,16 @@ public void
testHandleSplitRequestAfterSwitchAndReaderReset() {
assertThat(underlyingEnumeratorWrapper.handleSplitRequests,
Matchers.emptyIterable());
enumerator.handleSplitRequest(SUBTASK0, "fakehostname");
+ HybridSourceSplitSerializer.SwitchedSources switchedSources =
+ (HybridSourceSplitSerializer.SwitchedSources)
+ Whitebox.getInternalState(enumerator,
"switchedSources");
+
Review comment:
Please rather add a `@VisibleForTesting` package-private getter.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -92,13 +90,13 @@
private final List<SourceListEntry> sources;
// sources are populated per subtask at switch time
- private final Map<Integer, Source> switchedSources;
+ private final HybridSourceSplitSerializer.SwitchedSources switchedSources;
Review comment:
Yes please remove. `switchedSources` acted like a shared cache which is
now not necessary anymore. (Not sure how I missed that in the initial review, I
guess I was too focused on API.)
This should now just be a field in enumerator/reader that caches the
sources.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitSerializer.java
##########
@@ -74,25 +68,38 @@ private HybridSourceSplit deserializeV0(byte[] serialized)
throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
int sourceIndex = in.readInt();
+ String splitId = in.readUTF();
int nestedVersion = in.readInt();
int length = in.readInt();
byte[] splitBytes = new byte[length];
in.readFully(splitBytes);
- SourceSplit split =
serializerOf(sourceIndex).deserialize(nestedVersion, splitBytes);
- return new HybridSourceSplit(sourceIndex, split);
+ return new HybridSourceSplit(sourceIndex, splitBytes,
nestedVersion, splitId);
}
}
- private SimpleVersionedSerializer<SourceSplit> serializerOf(int
sourceIndex) {
- return cachedSerializers.computeIfAbsent(
- sourceIndex,
- (k -> {
- Source source =
- Preconditions.checkNotNull(
- switchedSources.get(k),
- "Source for index=%s not available",
- sourceIndex);
- return source.getSplitSerializer();
- }));
+ /** Sources that participated in switching with cached serializers. */
+ public static class SwitchedSources implements Serializable {
+ private final Map<Integer, Source> switchedSources = new HashMap<>();
+ private final Map<Integer, SimpleVersionedSerializer<SourceSplit>>
cachedSerializers =
Review comment:
`transient`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]