becketqin commented on a change in pull request #15531:
URL: https://github.com/apache/flink/pull/15531#discussion_r646430067



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
##########
@@ -206,12 +203,74 @@ public void close() {
 
     // ----------------- private methods -------------------
 
-    private PartitionSplitChange discoverAndInitializePartitionSplit() {
-        // Make a copy of the partitions to owners
-        KafkaSubscriber.PartitionChange partitionChange =
-                subscriber.getPartitionChanges(
-                        adminClient, 
Collections.unmodifiableSet(discoveredPartitions));
+    /**
+     * List subscribed topic partitions on Kafka brokers.
+     *
+     * <p>NOTE: This method should only be invoked in the worker executor 
thread, because it
+     * requires network I/O with Kafka brokers.
+     *
+     * @return Set of subscribed {@link TopicPartition}s
+     */
+    private Set<TopicPartition> listSubscribedTopicPartitions() {
+        return subscriber.listSubscribedTopicPartitions(adminClient);
+    }
+
+    /**
+     * Check if there's any partition changes within subscribed topic 
partitions fetched by worker
+     * thread, and invoke {@link 
KafkaSourceEnumerator#initializePartitionSplits(PartitionChange)}
+     * in worker thread to initialize splits for new partitions.
+     *
+     * <p>NOTE: This method should only be invoked in the coordinator executor 
thread.
+     *
+     * @param fetchedPartitions Map from topic name to its description
+     * @param t Exception in worker thread
+     */
+    private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, 
Throwable t) {
+        if (t != null) {
+            throw new FlinkRuntimeException(
+                    "Failed to list subscribed topic partitions due to ", t);
+        }
+
+        Set<TopicPartition> removedPartitions = new HashSet<>();
+
+        for (TopicPartition discoveredPartition : discoveredPartitions()) {

Review comment:
       A micro optimization, we can avoid creating the new `HashSet` here but 
just have a `getNewAndRemovedPartitions(fetchedPartitions, removedPartitions)` 
method.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java
##########
@@ -27,36 +27,45 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
-import static 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.maybeLog;
 
 /** A subscriber for a partition set. */
 class PartitionSetSubscriber implements KafkaSubscriber {
     private static final long serialVersionUID = 390970375272146036L;
     private static final Logger LOG = 
LoggerFactory.getLogger(PartitionSetSubscriber.class);
-    private final Set<TopicPartition> partitions;
+    private final Set<TopicPartition> subscribedPartitions;
 
     PartitionSetSubscriber(Set<TopicPartition> partitions) {
-        this.partitions = partitions;
+        this.subscribedPartitions = partitions;
     }
 
     @Override
-    public PartitionChange getPartitionChanges(
-            AdminClient adminClient, Set<TopicPartition> currentAssignment) {
-        Set<TopicPartition> newPartitions = new HashSet<>();
-        Set<TopicPartition> removedPartitions = new 
HashSet<>(currentAssignment);
-
-        Map<String, TopicDescription> topicMetadata = 
getTopicMetadata(adminClient);
-        for (TopicPartition tp : partitions) {
-            TopicDescription topicDescription = topicMetadata.get(tp.topic());
-            if (topicDescription != null && 
topicDescription.partitions().size() > tp.partition()) {
-                if (!removedPartitions.remove(tp)) {
-                    newPartitions.add(tp);
-                }
+    public Set<TopicPartition> listSubscribedTopicPartitions(AdminClient 
adminClient) {
+        final Set<String> topicNames =
+                subscribedPartitions.stream()
+                        .map(TopicPartition::topic)
+                        .collect(Collectors.toSet());
+
+        LOG.debug("Fetching descriptions for topics: {}", topicNames);
+        final Map<String, TopicDescription> topicMetadata =
+                getTopicMetadata(adminClient, topicNames);
+
+        Set<TopicPartition> subscribedPartitions = new HashSet<>();
+
+        for (TopicPartition subscribedPartition : this.subscribedPartitions) {

Review comment:
       If the user subscribed to a partition that does not exist, should we 
throw exception or at least log a warning?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java
##########
@@ -47,25 +46,18 @@
     }
 
     @Override
-    public PartitionChange getPartitionChanges(
-            AdminClient adminClient, Set<TopicPartition> currentAssignment) {
-        Set<TopicPartition> newPartitions = new HashSet<>();
-        Set<TopicPartition> removedPartitions = new 
HashSet<>(currentAssignment);
-
-        Map<String, TopicDescription> topicMetadata;
-        try {
-            topicMetadata = adminClient.describeTopics(topics).all().get();
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to get topic metadata.", e);
+    public Set<TopicPartition> listSubscribedTopicPartitions(AdminClient 
adminClient) {

Review comment:
       Can we add a unit test for subscribing to a non-existing topic as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to