becketqin commented on a change in pull request #14789: URL: https://github.com/apache/flink/pull/14789#discussion_r572647867
########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java ########## @@ -276,16 +284,21 @@ private void assignPendingPartitionSplits() { .addAll(newPartitionSplits); // Clear the pending splits for the reader owner. pendingPartitionSplitAssignment.remove(readerOwner); - // Sends NoMoreSplitsEvent to the readers if there is no more partition splits - // to be assigned. - if (noMoreNewPartitionSplits) { - LOG.debug( - "No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to the readers " - + "in consumer group {}.", - consumerGroupId); - context.signalNoMoreSplits(readerOwner); - } }); + if (noMoreNewPartitionSplits) { + signalNoMoreSplitsToNotNotifiedReaders(); + } + } + + private void signalNoMoreSplitsToNotNotifiedReaders() { + context.registeredReaders() + .forEach( + (readerId, ignore) -> { + if (!finishedReaders.contains(readerId)) { + context.signalNoMoreSplits(readerId); + finishedReaders.add(readerId); Review comment: What happens if the readers failover? Do we assume that the source readers remember the reception of `NoMoreSplitsEvent`? ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java ########## @@ -64,6 +64,8 @@ private final Properties properties; private final long partitionDiscoveryIntervalMs; private final SplitEnumeratorContext<KafkaPartitionSplit> context; + private volatile boolean partitionDiscoveryFinished = false; Review comment: It seems that we can just reuse the `noMoreNewPartitionSplits` flag. - If the periodic partition discovery is disabled, then after the first partition discovery is done, set the `noMoreNewPartitionSplits` flag to true. The subsequent `assignPendingPartitionSplits` will just send the `NoMoreSplitsEvent` to all the readers. As long as the flag is set in the main thread, the readers who registered before the first partition discovery is done will not receive duplicate `NoMoreSplitsEvent` in this case. - Otherwise, the `noMoreNewPartitionSplits` will always be set to false, and no `NoMoreSplitsEvent` will be sent. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org