This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new a595fbb [FLINK-24347][connectors/kafka] Keep idle source readers if
parallelism is higher than partitions in KafkaSource
a595fbb is described below
commit a595fbbfe0087181f1b92d2d991a885911f290a2
Author: Fabian Paul <[email protected]>
AuthorDate: Tue Sep 21 13:38:15 2021 +0200
[FLINK-24347][connectors/kafka] Keep idle source readers if parallelism is
higher than partitions in KafkaSource
Before this commit the enumerator signalled the leftover source readers
without a partition to finish.
This caused that checkpointing was not possible anymore because it is only
supported if all tasks are
running or FLIP-147 is enabled.
This closes #17327
---
.../flink/connector/kafka/source/KafkaSource.java | 4 ++-
.../source/enumerator/KafkaSourceEnumerator.java | 10 ++++--
.../source/enumerator/KafkaEnumeratorTest.java | 2 ++
.../connectors/kafka/KafkaConsumerTestBase.java | 4 ++-
.../kafka/testutils/ValidatingExactlyOnceSink.java | 38 ++++++++++++++++------
5 files changed, 44 insertions(+), 14 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index 9a05089..6df7d2f 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -175,7 +175,8 @@ public class KafkaSource<OUT>
startingOffsetsInitializer,
stoppingOffsetsInitializer,
props,
- enumContext);
+ enumContext,
+ boundedness);
}
@Override
@@ -189,6 +190,7 @@ public class KafkaSource<OUT>
stoppingOffsetsInitializer,
props,
enumContext,
+ boundedness,
checkpoint.assignedPartitions());
}
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 4f324d6..8b89b7e 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.kafka.source.enumerator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
@@ -65,6 +66,7 @@ public class KafkaSourceEnumerator
private final Properties properties;
private final long partitionDiscoveryIntervalMs;
private final SplitEnumeratorContext<KafkaPartitionSplit> context;
+ private final Boundedness boundedness;
/** Partitions that have been assigned to readers. */
private final Set<TopicPartition> assignedPartitions;
@@ -91,13 +93,15 @@ public class KafkaSourceEnumerator
OffsetsInitializer startingOffsetInitializer,
OffsetsInitializer stoppingOffsetInitializer,
Properties properties,
- SplitEnumeratorContext<KafkaPartitionSplit> context) {
+ SplitEnumeratorContext<KafkaPartitionSplit> context,
+ Boundedness boundedness) {
this(
subscriber,
startingOffsetInitializer,
stoppingOffsetInitializer,
properties,
context,
+ boundedness,
Collections.emptySet());
}
@@ -107,12 +111,14 @@ public class KafkaSourceEnumerator
OffsetsInitializer stoppingOffsetInitializer,
Properties properties,
SplitEnumeratorContext<KafkaPartitionSplit> context,
+ Boundedness boundedness,
Set<TopicPartition> assignedPartitions) {
this.subscriber = subscriber;
this.startingOffsetInitializer = startingOffsetInitializer;
this.stoppingOffsetInitializer = stoppingOffsetInitializer;
this.properties = properties;
this.context = context;
+ this.boundedness = boundedness;
this.assignedPartitions = new HashSet<>(assignedPartitions);
this.pendingPartitionSplitAssignment = new HashMap<>();
@@ -353,7 +359,7 @@ public class KafkaSourceEnumerator
// If periodically partition discovery is disabled and the
initializing discovery has done,
// signal NoMoreSplitsEvent to pending readers
- if (noMoreNewPartitionSplits) {
+ if (noMoreNewPartitionSplits && boundedness == Boundedness.BOUNDED) {
LOG.debug(
"No more KafkaPartitionSplits to assign. Sending
NoMoreSplitsEvent to reader {}"
+ " in consumer group {}.",
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index fdab5f14..6968bea 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.kafka.source.enumerator;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
@@ -468,6 +469,7 @@ public class KafkaEnumeratorTest {
stoppingOffsetsInitializer,
props,
enumContext,
+ Boundedness.CONTINUOUS_UNBOUNDED,
assignedPartitions);
}
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 1898b16..c231517 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1104,8 +1104,10 @@ public abstract class KafkaConsumerTestBase extends
KafkaTestBaseWithFlink {
getStream(env, topic, schema, props)
.map(new PartitionValidatingMapper(numPartitions, 1))
+ // Job only fails after a checkpoint is taken and the
necessary number of elements
+ // is seen
.map(new FailingIdentityMapper<Integer>(failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements, true))
.setParallelism(1);
FailingIdentityMapper.failedBefore = false;
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
index 149a730..7b0faf6 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
+import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -33,13 +34,14 @@ import java.util.List;
/** A {@link RichSinkFunction} that verifies that no duplicate records are
generated. */
public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer>
- implements ListCheckpointed<Tuple2<Integer, BitSet>>, Runnable {
+ implements ListCheckpointed<Tuple2<Integer, BitSet>>, Runnable,
CheckpointListener {
private static final Logger LOG =
LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
private static final long serialVersionUID = 1748426382527469932L;
private final int numElementsTotal;
+ private final boolean waitForFinalCheckpoint;
private BitSet duplicateChecker = new BitSet(); // this is checkpointed
@@ -49,7 +51,12 @@ public class ValidatingExactlyOnceSink extends
RichSinkFunction<Integer>
private volatile boolean printerRunning = true;
public ValidatingExactlyOnceSink(int numElementsTotal) {
+ this(numElementsTotal, false);
+ }
+
+ public ValidatingExactlyOnceSink(int numElementsTotal, boolean
waitForFinalCheckpoint) {
this.numElementsTotal = numElementsTotal;
+ this.waitForFinalCheckpoint = waitForFinalCheckpoint;
}
@Override
@@ -67,15 +74,8 @@ public class ValidatingExactlyOnceSink extends
RichSinkFunction<Integer>
throw new Exception("Received a duplicate: " + value);
}
duplicateChecker.set(value);
- if (numElements == numElementsTotal) {
- // validate
- if (duplicateChecker.cardinality() != numElementsTotal) {
- throw new Exception("Duplicate checker has wrong cardinality");
- } else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
- throw new Exception("Received sparse sequence");
- } else {
- throw new SuccessException();
- }
+ if (!waitForFinalCheckpoint) {
+ checkFinish();
}
}
@@ -125,4 +125,22 @@ public class ValidatingExactlyOnceSink extends
RichSinkFunction<Integer>
numElementsTotal);
}
}
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ checkFinish();
+ }
+
+ private void checkFinish() throws Exception {
+ if (numElements == numElementsTotal) {
+ // validate
+ if (duplicateChecker.cardinality() != numElementsTotal) {
+ throw new Exception("Duplicate checker has wrong cardinality");
+ } else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
+ throw new Exception("Received sparse sequence");
+ } else {
+ throw new SuccessException();
+ }
+ }
+ }
}