RamanVerma commented on code in PR #28:
URL:
https://github.com/apache/flink-connector-kafka/pull/28#discussion_r1190320189
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##########
@@ -56,54 +58,75 @@ public int getVersion() {
@Override
public byte[] serialize(KafkaSourceEnumState enumState) throws IOException
{
- return serializeTopicPartitions(enumState.assignedPartitions());
+
+ Set<TopicPartition> assignedPartitions =
enumState.assignedPartitions();
+ Set<TopicPartition> unassignedInitialPartitons =
enumState.unassignedInitialPartitons();
+ boolean initialDiscoveryFinished =
enumState.initialDiscoveryFinished();
+ return serializeTopicPartitions(
+ assignedPartitions, unassignedInitialPartitons,
initialDiscoveryFinished);
}
@Override
public KafkaSourceEnumState deserialize(int version, byte[] serialized)
throws IOException {
- if (version == CURRENT_VERSION) {
- final Set<TopicPartition> assignedPartitions =
deserializeTopicPartitions(serialized);
- return new KafkaSourceEnumState(assignedPartitions);
- }
-
- // Backward compatibility
- if (version == VERSION_0) {
- Map<Integer, Set<KafkaPartitionSplit>> currentPartitionAssignment =
- SerdeUtils.deserializeSplitAssignments(
- serialized, new KafkaPartitionSplitSerializer(),
HashSet::new);
- Set<TopicPartition> currentAssignedSplits = new HashSet<>();
- currentPartitionAssignment.forEach(
- (reader, splits) ->
- splits.forEach(
- split ->
currentAssignedSplits.add(split.getTopicPartition())));
- return new KafkaSourceEnumState(currentAssignedSplits);
+ switch (version) {
+ case CURRENT_VERSION:
+ return deserializeTopicPartitionsV2(serialized);
+ case VERSION_1:
+ final Set<TopicPartition> assignedPartitions =
+ deserializeTopicPartitionsV1(serialized);
+ return new KafkaSourceEnumState(assignedPartitions, new
HashSet<>(), true);
+ case VERSION_0:
+ Map<Integer, Set<KafkaPartitionSplit>>
currentPartitionAssignment =
+ SerdeUtils.deserializeSplitAssignments(
+ serialized, new
KafkaPartitionSplitSerializer(), HashSet::new);
+ Set<TopicPartition> currentAssignedSplits = new HashSet<>();
+ currentPartitionAssignment.forEach(
+ (reader, splits) ->
+ splits.forEach(
+ split ->
+ currentAssignedSplits.add(
+
split.getTopicPartition())));
+ return new KafkaSourceEnumState(currentAssignedSplits, new
HashSet<>(), true);
+ default:
+ throw new IOException(
+ String.format(
+ "The bytes are serialized with version %d, "
+ + "while this deserializer only
supports version up to %d",
+ version, CURRENT_VERSION));
}
-
- throw new IOException(
- String.format(
- "The bytes are serialized with version %d, "
- + "while this deserializer only supports
version up to %d",
- version, CURRENT_VERSION));
}
- private static byte[] serializeTopicPartitions(Collection<TopicPartition>
topicPartitions)
+ private static byte[] serializeTopicPartitions(
+ Collection<TopicPartition> assignedPartitions,
+ Collection<TopicPartition> unassignedInitialPartitons,
Review Comment:
typo `unassignedInitialPartitons` -> `unassignedInitialPartitions`
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##########
@@ -56,54 +58,75 @@ public int getVersion() {
@Override
public byte[] serialize(KafkaSourceEnumState enumState) throws IOException
{
- return serializeTopicPartitions(enumState.assignedPartitions());
+
+ Set<TopicPartition> assignedPartitions =
enumState.assignedPartitions();
+ Set<TopicPartition> unassignedInitialPartitons =
enumState.unassignedInitialPartitons();
+ boolean initialDiscoveryFinished =
enumState.initialDiscoveryFinished();
+ return serializeTopicPartitions(
Review Comment:
Maybe we can just get rid of the private method now.
We are serializing more than just topic partitions (initialDiscoveryFinished
is a boolean) so the method name needs to change. Also, there is no other
caller. So, let's just do everything in serialize method itself.
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##########
@@ -121,4 +144,54 @@ private static Set<TopicPartition>
deserializeTopicPartitions(byte[] serializedT
return topicPartitions;
}
}
+
+ private static KafkaSourceEnumState deserializeTopicPartitionsV2(
+ byte[] serializedTopicPartitions) throws IOException {
+
+ try (ByteArrayInputStream bais = new
ByteArrayInputStream(serializedTopicPartitions);
+ DataInputStream in = new DataInputStream(bais)) {
+
+ final int numAssignedPartitions = in.readInt();
+ Set<TopicPartition> assignedPartitions = new
HashSet<>(numAssignedPartitions);
+ for (int i = 0; i < numAssignedPartitions; i++) {
+ final String topic = in.readUTF();
+ final int partition = in.readInt();
+ assignedPartitions.add(new TopicPartition(topic, partition));
+ }
+ final int numUnassignedInitialPartitons = in.readInt();
Review Comment:
typos: `numUnassignedInitialPartitons`. Also in line 162
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]