This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dc3d0702087c8681435761eed7f2aa9161b09540 Author: Qingsheng Ren <[email protected]> AuthorDate: Thu Apr 1 17:56:29 2021 +0800 [FLINK-21159][connector/kafka] Signal NoMoreSplitsEvent to all readers even without any assignments This closes #15461 --- .../source/enumerator/KafkaSourceEnumerator.java | 91 +++++++++++++--------- .../connector/kafka/source/KafkaSourceITCase.java | 25 ++++++ .../connector/source/SplitEnumeratorContext.java | 2 +- 3 files changed, 80 insertions(+), 38 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 529df52..eba276b 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -87,6 +87,9 @@ public class KafkaSourceEnumerator // Lazily instantiated or mutable fields. private KafkaConsumer<byte[], byte[]> consumer; private AdminClient adminClient; + + // This flag will be marked as true if periodically partition discovery is disabled AND the + // initializing partition discovery has finished. private boolean noMoreNewPartitionSplits = false; public KafkaSourceEnumerator( @@ -170,7 +173,11 @@ public class KafkaSourceEnumerator @Override public void addSplitsBack(List<KafkaPartitionSplit> splits, int subtaskId) { addPartitionSplitChangeToPendingAssignments(splits); - assignPendingPartitionSplits(); + + // If the failed subtask has already restarted, we need to assign pending splits to it + if (context.registeredReaders().containsKey(subtaskId)) { + assignPendingPartitionSplits(Collections.singleton(subtaskId)); + } } @Override @@ -179,7 +186,7 @@ public class KafkaSourceEnumerator "Adding reader {} to KafkaSourceEnumerator for consumer group {}.", subtaskId, consumerGroupId); - assignPendingPartitionSplits(); + assignPendingPartitionSplits(Collections.singleton(subtaskId)); } @Override @@ -232,12 +239,12 @@ public class KafkaSourceEnumerator throw new FlinkRuntimeException("Failed to handle partition splits change due to ", t); } if (partitionDiscoveryIntervalMs < 0) { - LOG.debug(""); + LOG.debug("Partition discovery is disabled."); noMoreNewPartitionSplits = true; } // TODO: Handle removed partitions. addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits); - assignPendingPartitionSplits(); + assignPendingPartitionSplits(context.registeredReaders().keySet()); } // This method should only be invoked in the coordinator executor thread. @@ -258,42 +265,52 @@ public class KafkaSourceEnumerator } // This method should only be invoked in the coordinator executor thread. - private void assignPendingPartitionSplits() { + private void assignPendingPartitionSplits(Set<Integer> pendingReaders) { Map<Integer, List<KafkaPartitionSplit>> incrementalAssignment = new HashMap<>(); - pendingPartitionSplitAssignment.forEach( - (ownerReader, pendingSplits) -> { - if (!pendingSplits.isEmpty() - && context.registeredReaders().containsKey(ownerReader)) { - // The owner reader is ready, assign the split to the owner reader. - incrementalAssignment - .computeIfAbsent(ownerReader, r -> new ArrayList<>()) - .addAll(pendingSplits); - } - }); - if (incrementalAssignment.isEmpty()) { - // No assignment is made. - return; + + // Check if there's any pending splits for given readers + for (int pendingReader : pendingReaders) { + checkReaderRegistered(pendingReader); + + // Remove pending assignment for the reader + final Set<KafkaPartitionSplit> pendingAssignmentForReader = + pendingPartitionSplitAssignment.remove(pendingReader); + + if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) { + // Put pending assignment into incremental assignment + incrementalAssignment + .computeIfAbsent(pendingReader, ArrayList::new) + .addAll(pendingAssignmentForReader); + + // Make pending partitions as already assigned + pendingAssignmentForReader.forEach( + split -> assignedPartitions.add(split.getTopicPartition())); + } } - LOG.info("Assigning splits to readers {}", incrementalAssignment); - context.assignSplits(new SplitsAssignment<>(incrementalAssignment)); - incrementalAssignment.forEach( - (readerOwner, newPartitionSplits) -> { - // Update the split assignment. - newPartitionSplits.forEach( - split -> assignedPartitions.add(split.getTopicPartition())); - // Clear the pending splits for the reader owner. - pendingPartitionSplitAssignment.remove(readerOwner); - // Sends NoMoreSplitsEvent to the readers if there is no more partition splits - // to be assigned. - if (noMoreNewPartitionSplits) { - LOG.debug( - "No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to the readers " - + "in consumer group {}.", - consumerGroupId); - context.signalNoMoreSplits(readerOwner); - } - }); + // Assign pending splits to readers + if (!incrementalAssignment.isEmpty()) { + LOG.info("Assigning splits to readers {}", incrementalAssignment); + context.assignSplits(new SplitsAssignment<>(incrementalAssignment)); + } + + // If periodically partition discovery is disabled and the initializing discovery has done, + // signal NoMoreSplitsEvent to pending readers + if (noMoreNewPartitionSplits) { + LOG.debug( + "No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {}" + + " in consumer group {}.", + pendingReaders, + consumerGroupId); + pendingReaders.forEach(context::signalNoMoreSplits); + } + } + + private void checkReaderRegistered(int readerId) { + if (!context.registeredReaders().containsKey(readerId)) { + throw new IllegalStateException( + String.format("Reader %d is not registered to source coordinator", readerId)); + } } private KafkaConsumer<byte[], byte[]> getKafkaConsumer() { diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java index 6f7c66d..89ce39c 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -172,6 +173,30 @@ public class KafkaSourceITCase { assertEquals(expectedSum, actualSum.get()); } + @Test(timeout = 30000L) + public void testRedundantParallelism() throws Exception { + KafkaSource<PartitionAndValue> source = + KafkaSource.<PartitionAndValue>builder() + .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) + .setGroupId("testRedundantParallelism") + .setTopics(Collections.singletonList(TOPIC1)) + .setDeserializer(new TestingKafkaRecordDeserializationSchema()) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setBounded(OffsetsInitializer.latest()) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Here we use (NUM_PARTITION + 1) as the parallelism, so one SourceReader will not be + // assigned with any splits. The redundant SourceReader should also be signaled with a + // NoMoreSplitsEvent and eventually spins to FINISHED state. + env.setParallelism(KafkaSourceTestEnv.NUM_PARTITIONS + 1); + DataStream<PartitionAndValue> stream = + env.fromSource( + source, WatermarkStrategy.noWatermarks(), "testRedundantParallelism"); + executeAndVerify(env, stream); + } + // ----------------- private static class PartitionAndValue implements Serializable { diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java index 66b3ef4..65be11c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java @@ -92,7 +92,7 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> { /** * Invoke the callable and handover the return value to the handler which will be executed by - * the source coordinator. When this method is invoked multiple times, The <code>Coallble</code> + * the source coordinator. When this method is invoked multiple times, The <code>Callable</code> * s may be executed in a thread pool concurrently. * * <p>It is important to make sure that the callable does not modify any shared state,
