RamanVerma commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-kafka/pull/28#discussion_r1190292883


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java:
##########
@@ -22,18 +22,69 @@
 
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /** The state of Kafka source enumerator. */
 @Internal
 public class KafkaSourceEnumState {
-    private final Set<TopicPartition> assignedPartitions;
+    /** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */
+    private final Set<TopicPartitionWithAssignStatus> partitions;
+    /**
+     * this flag will be marked as true if inital partitions are discovered 
after enumerator starts.
+     */
+    private final boolean initialDiscoveryFinished;
 
-    KafkaSourceEnumState(Set<TopicPartition> assignedPartitions) {
-        this.assignedPartitions = assignedPartitions;
+    KafkaSourceEnumState(
+            Set<TopicPartition> assignPartitions,

Review Comment:
   please change the parameter names to `assignedPartitions` and 
`unassignedInitialPartitions`



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/TopicPartitionWithAssignStatus.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.kafka.common.TopicPartition;
+
+/** Kafka partition with assign status. */
+@Internal
+public class TopicPartitionWithAssignStatus {
+    private final TopicPartition topicPartition;
+    private final long assignStatus;

Review Comment:
   `assignmentStatus` would convey the meaning better than `assignStatus`
   Also, I would prefer `TopicPartitionAndAssignmentStatus` over 
`TopicPartitionWithAssignStatus`



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java:
##########
@@ -22,18 +22,69 @@
 
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /** The state of Kafka source enumerator. */
 @Internal
 public class KafkaSourceEnumState {
-    private final Set<TopicPartition> assignedPartitions;
+    /** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */
+    private final Set<TopicPartitionWithAssignStatus> partitions;
+    /**
+     * this flag will be marked as true if inital partitions are discovered 
after enumerator starts.
+     */
+    private final boolean initialDiscoveryFinished;
 
-    KafkaSourceEnumState(Set<TopicPartition> assignedPartitions) {
-        this.assignedPartitions = assignedPartitions;
+    KafkaSourceEnumState(
+            Set<TopicPartition> assignPartitions,
+            Set<TopicPartition> unAssignInitialPartitions,
+            boolean initialDiscoveryFinished) {
+        this.partitions = new HashSet<>();
+        partitions.addAll(
+                assignPartitions.stream()
+                        .map(
+                                topicPartition ->
+                                        new TopicPartitionWithAssignStatus(
+                                                topicPartition,
+                                                
TopicPartitionWithAssignStatus.ASSIGNED))
+                        .collect(Collectors.toSet()));
+        partitions.addAll(
+                unAssignInitialPartitions.stream()
+                        .map(
+                                topicPartition ->
+                                        new TopicPartitionWithAssignStatus(
+                                                topicPartition,
+                                                
TopicPartitionWithAssignStatus.UNASSIGNED_INITIAL))
+                        .collect(Collectors.toSet()));
+        this.initialDiscoveryFinished = initialDiscoveryFinished;
+    }
+
+    public Set<TopicPartitionWithAssignStatus> partitions() {
+        return partitions;
     }
 
     public Set<TopicPartition> assignedPartitions() {
-        return assignedPartitions;
+        return partitions.stream()

Review Comment:
   Lines 68-74 and 78-84 duplicate the code a bit. 
   Maybe you can define a private method to abstract the common code and call 
it from `assignedPartitions()` and `unassignedPartitions()`
   
   So, something like this
   ```
   private Set<TopicPartition> filterPartitions(long assignmentStatus);
   ```



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -113,10 +124,13 @@ public KafkaSourceEnumerator(
             Properties properties,
             SplitEnumeratorContext<KafkaPartitionSplit> context,
             Boundedness boundedness,
-            Set<TopicPartition> assignedPartitions) {
+            Set<TopicPartition> assignedPartitions,

Review Comment:
   It will be better to pass the `KafkaSourceEnumState` object in this 
constructor to limit the number of arguments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to