This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new a595fbb  [FLINK-24347][connectors/kafka] Keep idle source readers if 
parallelism is higher than partitions in KafkaSource
a595fbb is described below

commit a595fbbfe0087181f1b92d2d991a885911f290a2
Author: Fabian Paul <fabianp...@ververica.com>
AuthorDate: Tue Sep 21 13:38:15 2021 +0200

    [FLINK-24347][connectors/kafka] Keep idle source readers if parallelism is 
higher than partitions in KafkaSource
    
    Before this commit the enumerator signalled the leftover source readers 
without a partition to finish.
    This caused that checkpointing was not possible anymore because it is only 
supported if all tasks are
    running or FLIP-147 is enabled.
    
    This closes #17327
---
 .../flink/connector/kafka/source/KafkaSource.java  |  4 ++-
 .../source/enumerator/KafkaSourceEnumerator.java   | 10 ++++--
 .../source/enumerator/KafkaEnumeratorTest.java     |  2 ++
 .../connectors/kafka/KafkaConsumerTestBase.java    |  4 ++-
 .../kafka/testutils/ValidatingExactlyOnceSink.java | 38 ++++++++++++++++------
 5 files changed, 44 insertions(+), 14 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index 9a05089..6df7d2f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -175,7 +175,8 @@ public class KafkaSource<OUT>
                 startingOffsetsInitializer,
                 stoppingOffsetsInitializer,
                 props,
-                enumContext);
+                enumContext,
+                boundedness);
     }
 
     @Override
@@ -189,6 +190,7 @@ public class KafkaSource<OUT>
                 stoppingOffsetsInitializer,
                 props,
                 enumContext,
+                boundedness,
                 checkpoint.assignedPartitions());
     }
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 4f324d6..8b89b7e 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.kafka.source.enumerator;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
@@ -65,6 +66,7 @@ public class KafkaSourceEnumerator
     private final Properties properties;
     private final long partitionDiscoveryIntervalMs;
     private final SplitEnumeratorContext<KafkaPartitionSplit> context;
+    private final Boundedness boundedness;
 
     /** Partitions that have been assigned to readers. */
     private final Set<TopicPartition> assignedPartitions;
@@ -91,13 +93,15 @@ public class KafkaSourceEnumerator
             OffsetsInitializer startingOffsetInitializer,
             OffsetsInitializer stoppingOffsetInitializer,
             Properties properties,
-            SplitEnumeratorContext<KafkaPartitionSplit> context) {
+            SplitEnumeratorContext<KafkaPartitionSplit> context,
+            Boundedness boundedness) {
         this(
                 subscriber,
                 startingOffsetInitializer,
                 stoppingOffsetInitializer,
                 properties,
                 context,
+                boundedness,
                 Collections.emptySet());
     }
 
@@ -107,12 +111,14 @@ public class KafkaSourceEnumerator
             OffsetsInitializer stoppingOffsetInitializer,
             Properties properties,
             SplitEnumeratorContext<KafkaPartitionSplit> context,
+            Boundedness boundedness,
             Set<TopicPartition> assignedPartitions) {
         this.subscriber = subscriber;
         this.startingOffsetInitializer = startingOffsetInitializer;
         this.stoppingOffsetInitializer = stoppingOffsetInitializer;
         this.properties = properties;
         this.context = context;
+        this.boundedness = boundedness;
 
         this.assignedPartitions = new HashSet<>(assignedPartitions);
         this.pendingPartitionSplitAssignment = new HashMap<>();
@@ -353,7 +359,7 @@ public class KafkaSourceEnumerator
 
         // If periodically partition discovery is disabled and the 
initializing discovery has done,
         // signal NoMoreSplitsEvent to pending readers
-        if (noMoreNewPartitionSplits) {
+        if (noMoreNewPartitionSplits && boundedness == Boundedness.BOUNDED) {
             LOG.debug(
                     "No more KafkaPartitionSplits to assign. Sending 
NoMoreSplitsEvent to reader {}"
                             + " in consumer group {}.",
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index fdab5f14..6968bea 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.kafka.source.enumerator;
 
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
@@ -468,6 +469,7 @@ public class KafkaEnumeratorTest {
                 stoppingOffsetsInitializer,
                 props,
                 enumContext,
+                Boundedness.CONTINUOUS_UNBOUNDED,
                 assignedPartitions);
     }
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 1898b16..c231517 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1104,8 +1104,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
 
         getStream(env, topic, schema, props)
                 .map(new PartitionValidatingMapper(numPartitions, 1))
+                // Job only fails after a checkpoint is taken and the 
necessary number of elements
+                // is seen
                 .map(new FailingIdentityMapper<Integer>(failAfterElements))
-                .addSink(new ValidatingExactlyOnceSink(totalElements))
+                .addSink(new ValidatingExactlyOnceSink(totalElements, true))
                 .setParallelism(1);
 
         FailingIdentityMapper.failedBefore = false;
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
index 149a730..7b0faf6 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
+import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -33,13 +34,14 @@ import java.util.List;
 
 /** A {@link RichSinkFunction} that verifies that no duplicate records are 
generated. */
 public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer>
-        implements ListCheckpointed<Tuple2<Integer, BitSet>>, Runnable {
+        implements ListCheckpointed<Tuple2<Integer, BitSet>>, Runnable, 
CheckpointListener {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
 
     private static final long serialVersionUID = 1748426382527469932L;
 
     private final int numElementsTotal;
+    private final boolean waitForFinalCheckpoint;
 
     private BitSet duplicateChecker = new BitSet(); // this is checkpointed
 
@@ -49,7 +51,12 @@ public class ValidatingExactlyOnceSink extends 
RichSinkFunction<Integer>
     private volatile boolean printerRunning = true;
 
     public ValidatingExactlyOnceSink(int numElementsTotal) {
+        this(numElementsTotal, false);
+    }
+
+    public ValidatingExactlyOnceSink(int numElementsTotal, boolean 
waitForFinalCheckpoint) {
         this.numElementsTotal = numElementsTotal;
+        this.waitForFinalCheckpoint = waitForFinalCheckpoint;
     }
 
     @Override
@@ -67,15 +74,8 @@ public class ValidatingExactlyOnceSink extends 
RichSinkFunction<Integer>
             throw new Exception("Received a duplicate: " + value);
         }
         duplicateChecker.set(value);
-        if (numElements == numElementsTotal) {
-            // validate
-            if (duplicateChecker.cardinality() != numElementsTotal) {
-                throw new Exception("Duplicate checker has wrong cardinality");
-            } else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
-                throw new Exception("Received sparse sequence");
-            } else {
-                throw new SuccessException();
-            }
+        if (!waitForFinalCheckpoint) {
+            checkFinish();
         }
     }
 
@@ -125,4 +125,22 @@ public class ValidatingExactlyOnceSink extends 
RichSinkFunction<Integer>
                     numElementsTotal);
         }
     }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        checkFinish();
+    }
+
+    private void checkFinish() throws Exception {
+        if (numElements == numElementsTotal) {
+            // validate
+            if (duplicateChecker.cardinality() != numElementsTotal) {
+                throw new Exception("Duplicate checker has wrong cardinality");
+            } else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
+                throw new Exception("Received sparse sequence");
+            } else {
+                throw new SuccessException();
+            }
+        }
+    }
 }

Reply via email to