AHeise commented on code in PR #192:
URL: 
https://github.com/apache/flink-connector-kafka/pull/192#discussion_r2408038369


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##########
@@ -132,22 +197,43 @@ private static KafkaSourceEnumState 
deserializeAssignedTopicPartitions(
         }
     }
 
-    private static KafkaSourceEnumState 
deserializeTopicPartitionAndAssignmentStatus(
-            byte[] serialized) throws IOException {
+    @VisibleForTesting
+    static byte[] serializeV2(
+            Collection<SplitAndAssignmentStatus> splits, boolean 
initialDiscoveryFinished)
+            throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeInt(splits.size());
+            for (SplitAndAssignmentStatus splitAndAssignmentStatus : splits) {
+                final TopicPartition topicPartition =
+                        splitAndAssignmentStatus.split().getTopicPartition();
+                out.writeUTF(topicPartition.topic());
+                out.writeInt(topicPartition.partition());
+                
out.writeInt(splitAndAssignmentStatus.assignmentStatus().getStatusCode());
+            }
+            out.writeBoolean(initialDiscoveryFinished);
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    private static KafkaSourceEnumState deserializeVersion2(byte[] serialized) 
throws IOException {
 
         try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
                 DataInputStream in = new DataInputStream(bais)) {
 
             final int numPartitions = in.readInt();
-            Set<TopicPartitionAndAssignmentStatus> partitions = new 
HashSet<>(numPartitions);
+            Set<SplitAndAssignmentStatus> partitions = new 
HashSet<>(numPartitions);
 
             for (int i = 0; i < numPartitions; i++) {
                 final String topic = in.readUTF();
                 final int partition = in.readInt();
                 final int statusCode = in.readInt();
                 partitions.add(
-                        new TopicPartitionAndAssignmentStatus(
-                                new TopicPartition(topic, partition),
+                        new SplitAndAssignmentStatus(
+                                new KafkaPartitionSplit(
+                                        new TopicPartition(topic, partition),
+                                        DEFAULT_STARTING_OFFSET),

Review Comment:
   Yes, added a new MIGRATED offset to indicate that this needs to be 
initialized on recovery.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to