RamanVerma commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-kafka/pull/28#discussion_r1190320189


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##########
@@ -56,54 +58,75 @@ public int getVersion() {
 
     @Override
     public byte[] serialize(KafkaSourceEnumState enumState) throws IOException 
{
-        return serializeTopicPartitions(enumState.assignedPartitions());
+
+        Set<TopicPartition> assignedPartitions = 
enumState.assignedPartitions();
+        Set<TopicPartition> unassignedInitialPartitons = 
enumState.unassignedInitialPartitons();
+        boolean initialDiscoveryFinished = 
enumState.initialDiscoveryFinished();
+        return serializeTopicPartitions(
+                assignedPartitions, unassignedInitialPartitons, 
initialDiscoveryFinished);
     }
 
     @Override
     public KafkaSourceEnumState deserialize(int version, byte[] serialized) 
throws IOException {
-        if (version == CURRENT_VERSION) {
-            final Set<TopicPartition> assignedPartitions = 
deserializeTopicPartitions(serialized);
-            return new KafkaSourceEnumState(assignedPartitions);
-        }
-
-        // Backward compatibility
-        if (version == VERSION_0) {
-            Map<Integer, Set<KafkaPartitionSplit>> currentPartitionAssignment =
-                    SerdeUtils.deserializeSplitAssignments(
-                            serialized, new KafkaPartitionSplitSerializer(), 
HashSet::new);
-            Set<TopicPartition> currentAssignedSplits = new HashSet<>();
-            currentPartitionAssignment.forEach(
-                    (reader, splits) ->
-                            splits.forEach(
-                                    split -> 
currentAssignedSplits.add(split.getTopicPartition())));
-            return new KafkaSourceEnumState(currentAssignedSplits);
+        switch (version) {
+            case CURRENT_VERSION:
+                return deserializeTopicPartitionsV2(serialized);
+            case VERSION_1:
+                final Set<TopicPartition> assignedPartitions =
+                        deserializeTopicPartitionsV1(serialized);
+                return new KafkaSourceEnumState(assignedPartitions, new 
HashSet<>(), true);
+            case VERSION_0:
+                Map<Integer, Set<KafkaPartitionSplit>> 
currentPartitionAssignment =
+                        SerdeUtils.deserializeSplitAssignments(
+                                serialized, new 
KafkaPartitionSplitSerializer(), HashSet::new);
+                Set<TopicPartition> currentAssignedSplits = new HashSet<>();
+                currentPartitionAssignment.forEach(
+                        (reader, splits) ->
+                                splits.forEach(
+                                        split ->
+                                                currentAssignedSplits.add(
+                                                        
split.getTopicPartition())));
+                return new KafkaSourceEnumState(currentAssignedSplits, new 
HashSet<>(), true);
+            default:
+                throw new IOException(
+                        String.format(
+                                "The bytes are serialized with version %d, "
+                                        + "while this deserializer only 
supports version up to %d",
+                                version, CURRENT_VERSION));
         }
-
-        throw new IOException(
-                String.format(
-                        "The bytes are serialized with version %d, "
-                                + "while this deserializer only supports 
version up to %d",
-                        version, CURRENT_VERSION));
     }
 
-    private static byte[] serializeTopicPartitions(Collection<TopicPartition> 
topicPartitions)
+    private static byte[] serializeTopicPartitions(
+            Collection<TopicPartition> assignedPartitions,
+            Collection<TopicPartition> unassignedInitialPartitons,

Review Comment:
   typo `unassignedInitialPartitons` -> `unassignedInitialPartitions`



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##########
@@ -56,54 +58,75 @@ public int getVersion() {
 
     @Override
     public byte[] serialize(KafkaSourceEnumState enumState) throws IOException 
{
-        return serializeTopicPartitions(enumState.assignedPartitions());
+
+        Set<TopicPartition> assignedPartitions = 
enumState.assignedPartitions();
+        Set<TopicPartition> unassignedInitialPartitons = 
enumState.unassignedInitialPartitons();
+        boolean initialDiscoveryFinished = 
enumState.initialDiscoveryFinished();
+        return serializeTopicPartitions(

Review Comment:
   Maybe we can just get rid of the private method now. 
   We are serializing more than just topic partitions (initialDiscoveryFinished 
is a boolean) so the method name needs to change. Also, there is no other 
caller. So, let's just do everything in serialize method itself.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##########
@@ -121,4 +144,54 @@ private static Set<TopicPartition> 
deserializeTopicPartitions(byte[] serializedT
             return topicPartitions;
         }
     }
+
+    private static KafkaSourceEnumState deserializeTopicPartitionsV2(
+            byte[] serializedTopicPartitions) throws IOException {
+
+        try (ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedTopicPartitions);
+                DataInputStream in = new DataInputStream(bais)) {
+
+            final int numAssignedPartitions = in.readInt();
+            Set<TopicPartition> assignedPartitions = new 
HashSet<>(numAssignedPartitions);
+            for (int i = 0; i < numAssignedPartitions; i++) {
+                final String topic = in.readUTF();
+                final int partition = in.readInt();
+                assignedPartitions.add(new TopicPartition(topic, partition));
+            }
+            final int numUnassignedInitialPartitons = in.readInt();

Review Comment:
   typos: `numUnassignedInitialPartitons`. Also in line 162



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to