PatrickRen commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-kafka/pull/28#discussion_r1244818295


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##########
@@ -56,69 +67,114 @@ public int getVersion() {
 
     @Override
     public byte[] serialize(KafkaSourceEnumState enumState) throws IOException 
{
-        return serializeTopicPartitions(enumState.assignedPartitions());
+        Set<TopicPartitionAndAssignmentStatus> partitions = 
enumState.partitions();
+        boolean initialDiscoveryFinished = 
enumState.initialDiscoveryFinished();
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeInt(partitions.size());
+            for (TopicPartitionAndAssignmentStatus 
topicPartitionAndAssignmentStatus : partitions) {
+                
out.writeUTF(topicPartitionAndAssignmentStatus.topicPartition().topic());
+                
out.writeInt(topicPartitionAndAssignmentStatus.topicPartition().partition());
+                
out.writeInt(topicPartitionAndAssignmentStatus.assignmentStatus().getStatusCode());
+            }
+            out.flush();
+            out.writeBoolean(initialDiscoveryFinished);
+            out.flush();

Review Comment:
   Is there any specific reason to flush twice here?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/AssignmentStatus.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.enumerator;
+
+/** status of partition assignment. */
+public enum AssignmentStatus {
+
+    /** Partitions that have been assigned to readers. */
+    ASSIGNED(0),
+    /**
+     * The partitions that have been discovered during initialization but not 
assigned to readers
+     * yet.
+     */
+    UNASSIGNED_INITIAL(1);
+    private int statusCode;

Review Comment:
   private **final** int statusCode



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -88,6 +95,8 @@ public class KafkaSourceEnumerator
     // This flag will be marked as true if periodically partition discovery is 
disabled AND the
     // initializing partition discovery has finished.
     private boolean noMoreNewPartitionSplits = false;
+    // this flag will be marked as true if inital partitions are discovered 
after enumerator starts

Review Comment:
   typo: `inital` -> `initial` 



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/AssignmentStatus.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.enumerator;
+
+/** status of partition assignment. */
+public enum AssignmentStatus {

Review Comment:
   We need to mark this new class as `@Internal`



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##########
@@ -56,69 +67,114 @@ public int getVersion() {
 
     @Override
     public byte[] serialize(KafkaSourceEnumState enumState) throws IOException 
{
-        return serializeTopicPartitions(enumState.assignedPartitions());
+        Set<TopicPartitionAndAssignmentStatus> partitions = 
enumState.partitions();
+        boolean initialDiscoveryFinished = 
enumState.initialDiscoveryFinished();
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeInt(partitions.size());
+            for (TopicPartitionAndAssignmentStatus 
topicPartitionAndAssignmentStatus : partitions) {
+                
out.writeUTF(topicPartitionAndAssignmentStatus.topicPartition().topic());
+                
out.writeInt(topicPartitionAndAssignmentStatus.topicPartition().partition());
+                
out.writeInt(topicPartitionAndAssignmentStatus.assignmentStatus().getStatusCode());
+            }
+            out.flush();
+            out.writeBoolean(initialDiscoveryFinished);
+            out.flush();
+            return baos.toByteArray();
+        }
     }
 
     @Override
     public KafkaSourceEnumState deserialize(int version, byte[] serialized) 
throws IOException {
-        if (version == CURRENT_VERSION) {
-            final Set<TopicPartition> assignedPartitions = 
deserializeTopicPartitions(serialized);
-            return new KafkaSourceEnumState(assignedPartitions);
+        switch (version) {
+            case CURRENT_VERSION:
+                return 
deserializeTopicPartitionAndAssignmentStatus(serialized);
+            case VERSION_1:
+                final Set<TopicPartition> assignedPartitions =
+                        deserializeTopicPartitions(serialized);
+                return new KafkaSourceEnumState(assignedPartitions, new 
HashSet<>(), true);
+            case VERSION_0:
+                Map<Integer, Set<KafkaPartitionSplit>> 
currentPartitionAssignment =
+                        SerdeUtils.deserializeSplitAssignments(
+                                serialized, new 
KafkaPartitionSplitSerializer(), HashSet::new);
+                Set<TopicPartition> currentAssignedSplits = new HashSet<>();
+                currentPartitionAssignment.forEach(
+                        (reader, splits) ->
+                                splits.forEach(
+                                        split ->
+                                                currentAssignedSplits.add(
+                                                        
split.getTopicPartition())));
+                return new KafkaSourceEnumState(currentAssignedSplits, new 
HashSet<>(), true);
+            default:
+                throw new IOException(
+                        String.format(
+                                "The bytes are serialized with version %d, "
+                                        + "while this deserializer only 
supports version up to %d",
+                                version, CURRENT_VERSION));
         }
-
-        // Backward compatibility
-        if (version == VERSION_0) {
-            Map<Integer, Set<KafkaPartitionSplit>> currentPartitionAssignment =
-                    SerdeUtils.deserializeSplitAssignments(
-                            serialized, new KafkaPartitionSplitSerializer(), 
HashSet::new);
-            Set<TopicPartition> currentAssignedSplits = new HashSet<>();
-            currentPartitionAssignment.forEach(
-                    (reader, splits) ->
-                            splits.forEach(
-                                    split -> 
currentAssignedSplits.add(split.getTopicPartition())));
-            return new KafkaSourceEnumState(currentAssignedSplits);
-        }
-
-        throw new IOException(
-                String.format(
-                        "The bytes are serialized with version %d, "
-                                + "while this deserializer only supports 
version up to %d",
-                        version, CURRENT_VERSION));
     }
 
-    private static byte[] serializeTopicPartitions(Collection<TopicPartition> 
topicPartitions)
+    private static Set<TopicPartition> deserializeTopicPartitions(byte[] 
serializedTopicPartitions)
             throws IOException {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                DataOutputStream out = new DataOutputStream(baos)) {
+        try (ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedTopicPartitions);
+                DataInputStream in = new DataInputStream(bais)) {
 
-            out.writeInt(topicPartitions.size());
-            for (TopicPartition tp : topicPartitions) {
-                out.writeUTF(tp.topic());
-                out.writeInt(tp.partition());
+            final int numPartitions = in.readInt();
+            Set<TopicPartition> topicPartitions = new HashSet<>(numPartitions);
+            for (int i = 0; i < numPartitions; i++) {
+                final String topic = in.readUTF();
+                final int partition = in.readInt();
+                topicPartitions.add(new TopicPartition(topic, partition));
+            }
+            if (in.available() > 0) {
+                throw new IOException("Unexpected trailing bytes in serialized 
topic partitions");
             }
-            out.flush();
 
-            return baos.toByteArray();
+            return topicPartitions;
         }
     }
 
-    private static Set<TopicPartition> deserializeTopicPartitions(byte[] 
serializedTopicPartitions)
-            throws IOException {
+    private static KafkaSourceEnumState 
deserializeTopicPartitionAndAssignmentStatus(
+            byte[] serializedTopicPartitions) throws IOException {

Review Comment:
   The name of the argument is not quite appropriate. These bytes are actually 
the entire serialized `KafkaSourceEnumState` instead of just `TopicPartition`s.



##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializerTest.java:
##########
@@ -53,37 +57,53 @@ public void testEnumStateSerde() throws IOException {
                 serializer.deserialize(serializer.getVersion(), bytes);
 
         
assertThat(restoredState.assignedPartitions()).isEqualTo(state.assignedPartitions());
+        assertThat(restoredState.unassignedInitialPartitions())
+                .isEqualTo(state.unassignedInitialPartitions());
+        assertThat(restoredState.initialDiscoveryFinished()).isTrue();
     }
 
     @Test
     public void testBackwardCompatibility() throws IOException {
 
-        final Set<TopicPartition> topicPartitions = constructTopicPartitions();
+        final Set<TopicPartition> topicPartitions = 
constructTopicPartitions(0);
         final Map<Integer, Set<KafkaPartitionSplit>> splitAssignments =
                 toSplitAssignments(topicPartitions);
 
         // Create bytes in the way of KafkaEnumStateSerializer version 0 doing 
serialization
-        final byte[] bytes =
+        final byte[] bytesV0 =
                 SerdeUtils.serializeSplitAssignments(
                         splitAssignments, new KafkaPartitionSplitSerializer());
+        // Create bytes in the way of KafkaEnumStateSerializer version 1 doing 
serialization
+        final byte[] bytesV1 =
+                
KafkaSourceEnumStateSerializer.serializeTopicPartitions(topicPartitions);
 
-        // Deserialize above bytes with KafkaEnumStateSerializer version 1 to 
check backward
+        // Deserialize above bytes with KafkaEnumStateSerializer version 2 to 
check backward
         // compatibility
-        final KafkaSourceEnumState kafkaSourceEnumState =
-                new KafkaSourceEnumStateSerializer().deserialize(0, bytes);
-
-        
assertThat(kafkaSourceEnumState.assignedPartitions()).isEqualTo(topicPartitions);
+        final KafkaSourceEnumState kafkaSourceEnumStateV0 =
+                new KafkaSourceEnumStateSerializer().deserialize(0, bytesV0);
+        final KafkaSourceEnumState kafkaSourceEnumStateV1 =
+                new KafkaSourceEnumStateSerializer().deserialize(1, bytesV1);
+
+        
assertThat(kafkaSourceEnumStateV0.assignedPartitions()).isEqualTo(topicPartitions);
+        
assertThat(kafkaSourceEnumStateV0.unassignedInitialPartitions()).isEmpty();
+        assertThat(kafkaSourceEnumStateV0.initialDiscoveryFinished()).isTrue();
+
+        
assertThat(kafkaSourceEnumStateV1.assignedPartitions()).isEqualTo(topicPartitions);
+        
assertThat(kafkaSourceEnumStateV1.unassignedInitialPartitions()).isEmpty();
+        assertThat(kafkaSourceEnumStateV1.initialDiscoveryFinished()).isTrue();
     }
 
-    private Set<TopicPartition> constructTopicPartitions() {
+    private Set<TopicPartition> constructTopicPartitions(int startPartition) {
         // Create topic partitions for readers.
         // Reader i will be assigned with NUM_PARTITIONS_PER_TOPIC splits, 
with topic name
         // "topic-{i}" and
-        // NUM_PARTITIONS_PER_TOPIC partitions.
+        // NUM_PARTITIONS_PER_TOPIC partitions, which is start from 
startPartition.

Review Comment:
   NUM_PARTITIONS_PER_TOPIC partitions ~~, which is start from 
startPartition.~~ The starting partition number is `startPartition`.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/AssignmentStatus.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.enumerator;
+
+/** status of partition assignment. */
+public enum AssignmentStatus {
+
+    /** Partitions that have been assigned to readers. */
+    ASSIGNED(0),
+    /**
+     * The partitions that have been discovered during initialization but not 
assigned to readers
+     * yet.
+     */
+    UNASSIGNED_INITIAL(1);
+    private int statusCode;
+
+    AssignmentStatus(int statusCode) {
+        this.statusCode = statusCode;
+    }
+
+    public int getStatusCode() {
+        return statusCode;
+    }
+
+    public static AssignmentStatus findEnumByCode(int statusCode) {

Review Comment:
   Maybe use `ofStatusCode` as the name of the function? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to