This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dc3d0702087c8681435761eed7f2aa9161b09540
Author: Qingsheng Ren <[email protected]>
AuthorDate: Thu Apr 1 17:56:29 2021 +0800

    [FLINK-21159][connector/kafka] Signal NoMoreSplitsEvent to all readers even 
without any assignments
    
    This closes #15461
---
 .../source/enumerator/KafkaSourceEnumerator.java   | 91 +++++++++++++---------
 .../connector/kafka/source/KafkaSourceITCase.java  | 25 ++++++
 .../connector/source/SplitEnumeratorContext.java   |  2 +-
 3 files changed, 80 insertions(+), 38 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 529df52..eba276b 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -87,6 +87,9 @@ public class KafkaSourceEnumerator
     // Lazily instantiated or mutable fields.
     private KafkaConsumer<byte[], byte[]> consumer;
     private AdminClient adminClient;
+
+    // This flag will be marked as true if periodically partition discovery is 
disabled AND the
+    // initializing partition discovery has finished.
     private boolean noMoreNewPartitionSplits = false;
 
     public KafkaSourceEnumerator(
@@ -170,7 +173,11 @@ public class KafkaSourceEnumerator
     @Override
     public void addSplitsBack(List<KafkaPartitionSplit> splits, int subtaskId) 
{
         addPartitionSplitChangeToPendingAssignments(splits);
-        assignPendingPartitionSplits();
+
+        // If the failed subtask has already restarted, we need to assign 
pending splits to it
+        if (context.registeredReaders().containsKey(subtaskId)) {
+            assignPendingPartitionSplits(Collections.singleton(subtaskId));
+        }
     }
 
     @Override
@@ -179,7 +186,7 @@ public class KafkaSourceEnumerator
                 "Adding reader {} to KafkaSourceEnumerator for consumer group 
{}.",
                 subtaskId,
                 consumerGroupId);
-        assignPendingPartitionSplits();
+        assignPendingPartitionSplits(Collections.singleton(subtaskId));
     }
 
     @Override
@@ -232,12 +239,12 @@ public class KafkaSourceEnumerator
             throw new FlinkRuntimeException("Failed to handle partition splits 
change due to ", t);
         }
         if (partitionDiscoveryIntervalMs < 0) {
-            LOG.debug("");
+            LOG.debug("Partition discovery is disabled.");
             noMoreNewPartitionSplits = true;
         }
         // TODO: Handle removed partitions.
         
addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits);
-        assignPendingPartitionSplits();
+        assignPendingPartitionSplits(context.registeredReaders().keySet());
     }
 
     // This method should only be invoked in the coordinator executor thread.
@@ -258,42 +265,52 @@ public class KafkaSourceEnumerator
     }
 
     // This method should only be invoked in the coordinator executor thread.
-    private void assignPendingPartitionSplits() {
+    private void assignPendingPartitionSplits(Set<Integer> pendingReaders) {
         Map<Integer, List<KafkaPartitionSplit>> incrementalAssignment = new 
HashMap<>();
-        pendingPartitionSplitAssignment.forEach(
-                (ownerReader, pendingSplits) -> {
-                    if (!pendingSplits.isEmpty()
-                            && 
context.registeredReaders().containsKey(ownerReader)) {
-                        // The owner reader is ready, assign the split to the 
owner reader.
-                        incrementalAssignment
-                                .computeIfAbsent(ownerReader, r -> new 
ArrayList<>())
-                                .addAll(pendingSplits);
-                    }
-                });
-        if (incrementalAssignment.isEmpty()) {
-            // No assignment is made.
-            return;
+
+        // Check if there's any pending splits for given readers
+        for (int pendingReader : pendingReaders) {
+            checkReaderRegistered(pendingReader);
+
+            // Remove pending assignment for the reader
+            final Set<KafkaPartitionSplit> pendingAssignmentForReader =
+                    pendingPartitionSplitAssignment.remove(pendingReader);
+
+            if (pendingAssignmentForReader != null && 
!pendingAssignmentForReader.isEmpty()) {
+                // Put pending assignment into incremental assignment
+                incrementalAssignment
+                        .computeIfAbsent(pendingReader, ArrayList::new)
+                        .addAll(pendingAssignmentForReader);
+
+                // Make pending partitions as already assigned
+                pendingAssignmentForReader.forEach(
+                        split -> 
assignedPartitions.add(split.getTopicPartition()));
+            }
         }
 
-        LOG.info("Assigning splits to readers {}", incrementalAssignment);
-        context.assignSplits(new SplitsAssignment<>(incrementalAssignment));
-        incrementalAssignment.forEach(
-                (readerOwner, newPartitionSplits) -> {
-                    // Update the split assignment.
-                    newPartitionSplits.forEach(
-                            split -> 
assignedPartitions.add(split.getTopicPartition()));
-                    // Clear the pending splits for the reader owner.
-                    pendingPartitionSplitAssignment.remove(readerOwner);
-                    // Sends NoMoreSplitsEvent to the readers if there is no 
more partition splits
-                    // to be assigned.
-                    if (noMoreNewPartitionSplits) {
-                        LOG.debug(
-                                "No more KafkaPartitionSplits to assign. 
Sending NoMoreSplitsEvent to the readers "
-                                        + "in consumer group {}.",
-                                consumerGroupId);
-                        context.signalNoMoreSplits(readerOwner);
-                    }
-                });
+        // Assign pending splits to readers
+        if (!incrementalAssignment.isEmpty()) {
+            LOG.info("Assigning splits to readers {}", incrementalAssignment);
+            context.assignSplits(new 
SplitsAssignment<>(incrementalAssignment));
+        }
+
+        // If periodically partition discovery is disabled and the 
initializing discovery has done,
+        // signal NoMoreSplitsEvent to pending readers
+        if (noMoreNewPartitionSplits) {
+            LOG.debug(
+                    "No more KafkaPartitionSplits to assign. Sending 
NoMoreSplitsEvent to reader {}"
+                            + " in consumer group {}.",
+                    pendingReaders,
+                    consumerGroupId);
+            pendingReaders.forEach(context::signalNoMoreSplits);
+        }
+    }
+
+    private void checkReaderRegistered(int readerId) {
+        if (!context.registeredReaders().containsKey(readerId)) {
+            throw new IllegalStateException(
+                    String.format("Reader %d is not registered to source 
coordinator", readerId));
+        }
     }
 
     private KafkaConsumer<byte[], byte[]> getKafkaConsumer() {
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 6f7c66d..89ce39c 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -48,6 +48,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -172,6 +173,30 @@ public class KafkaSourceITCase {
         assertEquals(expectedSum, actualSum.get());
     }
 
+    @Test(timeout = 30000L)
+    public void testRedundantParallelism() throws Exception {
+        KafkaSource<PartitionAndValue> source =
+                KafkaSource.<PartitionAndValue>builder()
+                        
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                        .setGroupId("testRedundantParallelism")
+                        .setTopics(Collections.singletonList(TOPIC1))
+                        .setDeserializer(new 
TestingKafkaRecordDeserializationSchema())
+                        .setStartingOffsets(OffsetsInitializer.earliest())
+                        .setBounded(OffsetsInitializer.latest())
+                        .build();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // Here we use (NUM_PARTITION + 1) as the parallelism, so one 
SourceReader will not be
+        // assigned with any splits. The redundant SourceReader should also be 
signaled with a
+        // NoMoreSplitsEvent and eventually spins to FINISHED state.
+        env.setParallelism(KafkaSourceTestEnv.NUM_PARTITIONS + 1);
+        DataStream<PartitionAndValue> stream =
+                env.fromSource(
+                        source, WatermarkStrategy.noWatermarks(), 
"testRedundantParallelism");
+        executeAndVerify(env, stream);
+    }
+
     // -----------------
 
     private static class PartitionAndValue implements Serializable {
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 66b3ef4..65be11c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -92,7 +92,7 @@ public interface SplitEnumeratorContext<SplitT extends 
SourceSplit> {
 
     /**
      * Invoke the callable and handover the return value to the handler which 
will be executed by
-     * the source coordinator. When this method is invoked multiple times, The 
<code>Coallble</code>
+     * the source coordinator. When this method is invoked multiple times, The 
<code>Callable</code>
      * s may be executed in a thread pool concurrently.
      *
      * <p>It is important to make sure that the callable does not modify any 
shared state,

Reply via email to