This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new a595fbb [FLINK-24347][connectors/kafka] Keep idle source readers if parallelism is higher than partitions in KafkaSource a595fbb is described below commit a595fbbfe0087181f1b92d2d991a885911f290a2 Author: Fabian Paul <fabianp...@ververica.com> AuthorDate: Tue Sep 21 13:38:15 2021 +0200 [FLINK-24347][connectors/kafka] Keep idle source readers if parallelism is higher than partitions in KafkaSource Before this commit the enumerator signalled the leftover source readers without a partition to finish. This caused that checkpointing was not possible anymore because it is only supported if all tasks are running or FLIP-147 is enabled. This closes #17327 --- .../flink/connector/kafka/source/KafkaSource.java | 4 ++- .../source/enumerator/KafkaSourceEnumerator.java | 10 ++++-- .../source/enumerator/KafkaEnumeratorTest.java | 2 ++ .../connectors/kafka/KafkaConsumerTestBase.java | 4 ++- .../kafka/testutils/ValidatingExactlyOnceSink.java | 38 ++++++++++++++++------ 5 files changed, 44 insertions(+), 14 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java index 9a05089..6df7d2f 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java @@ -175,7 +175,8 @@ public class KafkaSource<OUT> startingOffsetsInitializer, stoppingOffsetsInitializer, props, - enumContext); + enumContext, + boundedness); } @Override @@ -189,6 +190,7 @@ public class KafkaSource<OUT> stoppingOffsetsInitializer, props, enumContext, + boundedness, checkpoint.assignedPartitions()); } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 4f324d6..8b89b7e 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -20,6 +20,7 @@ package org.apache.flink.connector.kafka.source.enumerator; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; @@ -65,6 +66,7 @@ public class KafkaSourceEnumerator private final Properties properties; private final long partitionDiscoveryIntervalMs; private final SplitEnumeratorContext<KafkaPartitionSplit> context; + private final Boundedness boundedness; /** Partitions that have been assigned to readers. */ private final Set<TopicPartition> assignedPartitions; @@ -91,13 +93,15 @@ public class KafkaSourceEnumerator OffsetsInitializer startingOffsetInitializer, OffsetsInitializer stoppingOffsetInitializer, Properties properties, - SplitEnumeratorContext<KafkaPartitionSplit> context) { + SplitEnumeratorContext<KafkaPartitionSplit> context, + Boundedness boundedness) { this( subscriber, startingOffsetInitializer, stoppingOffsetInitializer, properties, context, + boundedness, Collections.emptySet()); } @@ -107,12 +111,14 @@ public class KafkaSourceEnumerator OffsetsInitializer stoppingOffsetInitializer, Properties properties, SplitEnumeratorContext<KafkaPartitionSplit> context, + Boundedness boundedness, Set<TopicPartition> assignedPartitions) { this.subscriber = subscriber; this.startingOffsetInitializer = startingOffsetInitializer; this.stoppingOffsetInitializer = stoppingOffsetInitializer; this.properties = properties; this.context = context; + this.boundedness = boundedness; this.assignedPartitions = new HashSet<>(assignedPartitions); this.pendingPartitionSplitAssignment = new HashMap<>(); @@ -353,7 +359,7 @@ public class KafkaSourceEnumerator // If periodically partition discovery is disabled and the initializing discovery has done, // signal NoMoreSplitsEvent to pending readers - if (noMoreNewPartitionSplits) { + if (noMoreNewPartitionSplits && boundedness == Boundedness.BOUNDED) { LOG.debug( "No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {}" + " in consumer group {}.", diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java index fdab5f14..6968bea 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kafka.source.enumerator; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; @@ -468,6 +469,7 @@ public class KafkaEnumeratorTest { stoppingOffsetsInitializer, props, enumContext, + Boundedness.CONTINUOUS_UNBOUNDED, assignedPartitions); } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 1898b16..c231517 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -1104,8 +1104,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { getStream(env, topic, schema, props) .map(new PartitionValidatingMapper(numPartitions, 1)) + // Job only fails after a checkpoint is taken and the necessary number of elements + // is seen .map(new FailingIdentityMapper<Integer>(failAfterElements)) - .addSink(new ValidatingExactlyOnceSink(totalElements)) + .addSink(new ValidatingExactlyOnceSink(totalElements, true)) .setParallelism(1); FailingIdentityMapper.failedBefore = false; diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java index 149a730..7b0faf6 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka.testutils; +import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; @@ -33,13 +34,14 @@ import java.util.List; /** A {@link RichSinkFunction} that verifies that no duplicate records are generated. */ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> - implements ListCheckpointed<Tuple2<Integer, BitSet>>, Runnable { + implements ListCheckpointed<Tuple2<Integer, BitSet>>, Runnable, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class); private static final long serialVersionUID = 1748426382527469932L; private final int numElementsTotal; + private final boolean waitForFinalCheckpoint; private BitSet duplicateChecker = new BitSet(); // this is checkpointed @@ -49,7 +51,12 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> private volatile boolean printerRunning = true; public ValidatingExactlyOnceSink(int numElementsTotal) { + this(numElementsTotal, false); + } + + public ValidatingExactlyOnceSink(int numElementsTotal, boolean waitForFinalCheckpoint) { this.numElementsTotal = numElementsTotal; + this.waitForFinalCheckpoint = waitForFinalCheckpoint; } @Override @@ -67,15 +74,8 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> throw new Exception("Received a duplicate: " + value); } duplicateChecker.set(value); - if (numElements == numElementsTotal) { - // validate - if (duplicateChecker.cardinality() != numElementsTotal) { - throw new Exception("Duplicate checker has wrong cardinality"); - } else if (duplicateChecker.nextClearBit(0) != numElementsTotal) { - throw new Exception("Received sparse sequence"); - } else { - throw new SuccessException(); - } + if (!waitForFinalCheckpoint) { + checkFinish(); } } @@ -125,4 +125,22 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> numElementsTotal); } } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + checkFinish(); + } + + private void checkFinish() throws Exception { + if (numElements == numElementsTotal) { + // validate + if (duplicateChecker.cardinality() != numElementsTotal) { + throw new Exception("Duplicate checker has wrong cardinality"); + } else if (duplicateChecker.nextClearBit(0) != numElementsTotal) { + throw new Exception("Received sparse sequence"); + } else { + throw new SuccessException(); + } + } + } }