loserwang1024 commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-kafka/pull/28#discussion_r1208763810


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java:
##########
@@ -22,18 +22,69 @@
 
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /** The state of Kafka source enumerator. */
 @Internal
 public class KafkaSourceEnumState {
-    private final Set<TopicPartition> assignedPartitions;
+    /** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */
+    private final Set<TopicPartitionWithAssignStatus> partitions;
+    /**
+     * this flag will be marked as true if inital partitions are discovered 
after enumerator starts.
+     */
+    private final boolean initialDiscoveryFinished;
 
-    KafkaSourceEnumState(Set<TopicPartition> assignedPartitions) {
-        this.assignedPartitions = assignedPartitions;
+    KafkaSourceEnumState(
+            Set<TopicPartition> assignPartitions,
+            Set<TopicPartition> unAssignInitialPartitions,
+            boolean initialDiscoveryFinished) {
+        this.partitions = new HashSet<>();
+        partitions.addAll(
+                assignPartitions.stream()
+                        .map(
+                                topicPartition ->
+                                        new TopicPartitionWithAssignStatus(
+                                                topicPartition,
+                                                
TopicPartitionWithAssignStatus.ASSIGNED))
+                        .collect(Collectors.toSet()));
+        partitions.addAll(
+                unAssignInitialPartitions.stream()
+                        .map(
+                                topicPartition ->
+                                        new TopicPartitionWithAssignStatus(
+                                                topicPartition,
+                                                
TopicPartitionWithAssignStatus.UNASSIGNED_INITIAL))
+                        .collect(Collectors.toSet()));
+        this.initialDiscoveryFinished = initialDiscoveryFinished;
+    }
+
+    public Set<TopicPartitionWithAssignStatus> partitions() {
+        return partitions;
     }
 
     public Set<TopicPartition> assignedPartitions() {
-        return assignedPartitions;
+        return partitions.stream()

Review Comment:
   @RamanVerma Thanks for your advice. Would you like to code review again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to