becketqin commented on a change in pull request #14789:
URL: https://github.com/apache/flink/pull/14789#discussion_r572647867



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
##########
@@ -276,16 +284,21 @@ private void assignPendingPartitionSplits() {
                             .addAll(newPartitionSplits);
                     // Clear the pending splits for the reader owner.
                     pendingPartitionSplitAssignment.remove(readerOwner);
-                    // Sends NoMoreSplitsEvent to the readers if there is no 
more partition splits
-                    // to be assigned.
-                    if (noMoreNewPartitionSplits) {
-                        LOG.debug(
-                                "No more KafkaPartitionSplits to assign. 
Sending NoMoreSplitsEvent to the readers "
-                                        + "in consumer group {}.",
-                                consumerGroupId);
-                        context.signalNoMoreSplits(readerOwner);
-                    }
                 });
+        if (noMoreNewPartitionSplits) {
+            signalNoMoreSplitsToNotNotifiedReaders();
+        }
+    }
+
+    private void signalNoMoreSplitsToNotNotifiedReaders() {
+        context.registeredReaders()
+                .forEach(
+                        (readerId, ignore) -> {
+                            if (!finishedReaders.contains(readerId)) {
+                                context.signalNoMoreSplits(readerId);
+                                finishedReaders.add(readerId);

Review comment:
       What happens if the readers failover? Do we assume that the source 
readers remember the reception of `NoMoreSplitsEvent`?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
##########
@@ -64,6 +64,8 @@
     private final Properties properties;
     private final long partitionDiscoveryIntervalMs;
     private final SplitEnumeratorContext<KafkaPartitionSplit> context;
+    private volatile boolean partitionDiscoveryFinished = false;

Review comment:
       It seems that we can just reuse the `noMoreNewPartitionSplits` flag.
   
   - If the periodic partition discovery is disabled, then after the first 
partition discovery is done, set the `noMoreNewPartitionSplits` flag to true. 
The subsequent `assignPendingPartitionSplits` will just send the 
`NoMoreSplitsEvent` to all the readers. As long as the flag is set in the main 
thread, the readers who registered before the first partition discovery is done 
will not receive duplicate `NoMoreSplitsEvent` in this case.
   - Otherwise, the `noMoreNewPartitionSplits` will always be set to false, and 
no `NoMoreSplitsEvent` will be sent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to