StephanEwen commented on a change in pull request #15397:
URL: https://github.com/apache/flink/pull/15397#discussion_r603395847
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java
##########
@@ -130,4 +130,81 @@ private SerdeUtils() {}
return splitsAssignments;
}
}
+
+ /**
+ * Serialize a collection of splits. The serialized format is following:
+ *
+ * <pre>
+ * 4 bytes - number of split in collection
+ * 4 bytes - split serializer version
+ * N bytes - [splits_in_collection]
+ * 4 bytes - serialized split length
+ * N bytes - serialized splits
+ * </pre>
+ *
+ * @param splitCollection The collection of splits
+ * @param splitSerializer the serializer of the split.
+ * @param <SplitT> the type of the splits.
+ * @param <C> the type of the collection to hold the assigned splits for a
subtask.
+ * @return the serialized bytes of the given subtask to splits assignment
mapping.
+ * @throws IOException when serialization failed.
+ */
+ public static <SplitT extends SourceSplit, C extends Collection<SplitT>>
+ byte[] serializeSplitCollection(
+ C splitCollection, SimpleVersionedSerializer<SplitT>
splitSerializer)
Review comment:
This generic `C extends Collection<SplitT>` is unnecessary. You can
anyways pass subclasses of `Collection`, even if you directly make the
parameter `Collection<SplitT>`.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/utils/SerdeUtils.java
##########
@@ -130,4 +130,81 @@ private SerdeUtils() {}
return splitsAssignments;
}
}
+
+ /**
+ * Serialize a collection of splits. The serialized format is following:
+ *
+ * <pre>
+ * 4 bytes - number of split in collection
+ * 4 bytes - split serializer version
+ * N bytes - [splits_in_collection]
+ * 4 bytes - serialized split length
+ * N bytes - serialized splits
+ * </pre>
+ *
+ * @param splitCollection The collection of splits
+ * @param splitSerializer the serializer of the split.
+ * @param <SplitT> the type of the splits.
+ * @param <C> the type of the collection to hold the assigned splits for a
subtask.
+ * @return the serialized bytes of the given subtask to splits assignment
mapping.
+ * @throws IOException when serialization failed.
+ */
+ public static <SplitT extends SourceSplit, C extends Collection<SplitT>>
+ byte[] serializeSplitCollection(
+ C splitCollection, SimpleVersionedSerializer<SplitT>
splitSerializer)
+ throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos)) {
+
+ // Number of split in the collection
+ out.writeInt(splitCollection.size());
+ // Split serializer version.
+ out.writeInt(splitSerializer.getVersion());
+ // Write splits in the collection
+ for (SplitT split : splitCollection) {
+ final byte[] serializedSplit =
splitSerializer.serialize(split);
+ out.writeInt(serializedSplit.length);
+ out.write(serializedSplit);
+ }
+ return baos.toByteArray();
+ }
+ }
+
+ /**
+ * Deserialize the given bytes returned by {@link
#serializeSplitCollection(C,
+ * SimpleVersionedSerializer)}.
+ *
+ * @param serialized the serialized bytes returned by {@link
#serializeSplitCollection(C,
+ * SimpleVersionedSerializer)}.
+ * @param splitSerializer the split serializer for the splits.
+ * @param collectionSupplier the supplier for the {@link Collection}
instance to hold splits.
+ * @param <SplitT> the type of the splits.
+ * @param <C> the type of the collection to hold splits.
+ * @return A collection of splits.
+ * @throws IOException when deserialization failed.
+ */
+ public static <SplitT extends SourceSplit, C extends Collection<SplitT>>
+ C deserializeSplitCollection(
+ byte[] serialized,
+ SimpleVersionedSerializer<SplitT> splitSerializer,
+ Function<Integer, C> collectionSupplier)
Review comment:
This seems correct, but complex given that it always creates a `Set`
anyways.
Why not just always return a `Set` from the method (and create a `HashSet`
internally)?
You could also return a `Collection`, see high-level comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]