This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 28211c0 [FLINK-24347][connectors/kafka] Keep idle source readers if
parallelism is higher than partitions in KafkaSource
28211c0 is described below
commit 28211c0dec1dfb706c5d9583fd757cdd4de38ce8
Author: Fabian Paul <[email protected]>
AuthorDate: Wed Sep 22 01:14:15 2021 +0200
[FLINK-24347][connectors/kafka] Keep idle source readers if parallelism is
higher than partitions in KafkaSource
Before this commit the enumerator signalled the leftover source readers
without a partition to finish. This caused that checkpointing was not
possible anymore because it is only supported if all tasks are running
or FLIP-147 is enabled.
This closes #17330
---
.../flink/connector/kafka/source/KafkaSource.java | 4 ++-
.../source/enumerator/KafkaSourceEnumerator.java | 10 ++++--
.../source/enumerator/KafkaEnumeratorTest.java | 2 ++
.../connectors/kafka/KafkaConsumerTestBase.java | 4 ++-
.../kafka/testutils/ValidatingExactlyOnceSink.java | 38 ++++++++++++++++------
5 files changed, 44 insertions(+), 14 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index d1219c0..30c4dd2 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -175,7 +175,8 @@ public class KafkaSource<OUT>
startingOffsetsInitializer,
stoppingOffsetsInitializer,
props,
- enumContext);
+ enumContext,
+ boundedness);
}
@Override
@@ -189,6 +190,7 @@ public class KafkaSource<OUT>
stoppingOffsetsInitializer,
props,
enumContext,
+ boundedness,
checkpoint.assignedPartitions());
}
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 9c69f38..1af52c2 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.kafka.source.enumerator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
@@ -64,6 +65,7 @@ public class KafkaSourceEnumerator
private final Properties properties;
private final long partitionDiscoveryIntervalMs;
private final SplitEnumeratorContext<KafkaPartitionSplit> context;
+ private final Boundedness boundedness;
// The internal states of the enumerator.
/**
@@ -97,13 +99,15 @@ public class KafkaSourceEnumerator
OffsetsInitializer startingOffsetInitializer,
OffsetsInitializer stoppingOffsetInitializer,
Properties properties,
- SplitEnumeratorContext<KafkaPartitionSplit> context) {
+ SplitEnumeratorContext<KafkaPartitionSplit> context,
+ Boundedness boundedness) {
this(
subscriber,
startingOffsetInitializer,
stoppingOffsetInitializer,
properties,
context,
+ boundedness,
Collections.emptySet());
}
@@ -113,12 +117,14 @@ public class KafkaSourceEnumerator
OffsetsInitializer stoppingOffsetInitializer,
Properties properties,
SplitEnumeratorContext<KafkaPartitionSplit> context,
+ Boundedness boundedness,
Set<TopicPartition> assignedPartitions) {
this.subscriber = subscriber;
this.startingOffsetInitializer = startingOffsetInitializer;
this.stoppingOffsetInitializer = stoppingOffsetInitializer;
this.properties = properties;
this.context = context;
+ this.boundedness = boundedness;
this.discoveredPartitions = new HashSet<>();
this.assignedPartitions = new HashSet<>(assignedPartitions);
@@ -296,7 +302,7 @@ public class KafkaSourceEnumerator
// If periodically partition discovery is disabled and the
initializing discovery has done,
// signal NoMoreSplitsEvent to pending readers
- if (noMoreNewPartitionSplits) {
+ if (noMoreNewPartitionSplits && boundedness == Boundedness.BOUNDED) {
LOG.debug(
"No more KafkaPartitionSplits to assign. Sending
NoMoreSplitsEvent to reader {}"
+ " in consumer group {}.",
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index a46ac2d..edb4c81 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.kafka.source.enumerator;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
@@ -432,6 +433,7 @@ public class KafkaEnumeratorTest {
stoppingOffsetsInitializer,
props,
enumContext,
+ Boundedness.CONTINUOUS_UNBOUNDED,
assignedPartitions);
}
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 93e0cfe..a76534f 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1104,8 +1104,10 @@ public abstract class KafkaConsumerTestBase extends
KafkaTestBaseWithFlink {
getStream(env, topic, schema, props)
.map(new PartitionValidatingMapper(numPartitions, 1))
+ // Job only fails after a checkpoint is taken and the
necessary number of elements
+ // is seen
.map(new FailingIdentityMapper<Integer>(failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements, true))
.setParallelism(1);
FailingIdentityMapper.failedBefore = false;
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
index 41a2d86..f00be21 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
+import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -32,20 +33,26 @@ import java.util.List;
/** A {@link RichSinkFunction} that verifies that no duplicate records are
generated. */
public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer>
- implements ListCheckpointed<Tuple2<Integer, BitSet>> {
+ implements ListCheckpointed<Tuple2<Integer, BitSet>>,
CheckpointListener {
private static final Logger LOG =
LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
private static final long serialVersionUID = 1748426382527469932L;
private final int numElementsTotal;
+ private final boolean waitForFinalCheckpoint;
private BitSet duplicateChecker = new BitSet(); // this is checkpointed
private int numElements; // this is checkpointed
public ValidatingExactlyOnceSink(int numElementsTotal) {
+ this(numElementsTotal, false);
+ }
+
+ public ValidatingExactlyOnceSink(int numElementsTotal, boolean
waitForFinalCheckpoint) {
this.numElementsTotal = numElementsTotal;
+ this.waitForFinalCheckpoint = waitForFinalCheckpoint;
}
@Override
@@ -56,15 +63,8 @@ public class ValidatingExactlyOnceSink extends
RichSinkFunction<Integer>
throw new Exception("Received a duplicate: " + value);
}
duplicateChecker.set(value);
- if (numElements == numElementsTotal) {
- // validate
- if (duplicateChecker.cardinality() != numElementsTotal) {
- throw new Exception("Duplicate checker has wrong cardinality");
- } else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
- throw new Exception("Received sparse sequence");
- } else {
- throw new SuccessException();
- }
+ if (!waitForFinalCheckpoint) {
+ checkFinish();
}
}
@@ -87,4 +87,22 @@ public class ValidatingExactlyOnceSink extends
RichSinkFunction<Integer>
this.numElements = s.f0;
this.duplicateChecker = s.f1;
}
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ checkFinish();
+ }
+
+ private void checkFinish() throws Exception {
+ if (numElements == numElementsTotal) {
+ // validate
+ if (duplicateChecker.cardinality() != numElementsTotal) {
+ throw new Exception("Duplicate checker has wrong cardinality");
+ } else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
+ throw new Exception("Received sparse sequence");
+ } else {
+ throw new SuccessException();
+ }
+ }
+ }
}