This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new cdfa328b [FLINK-32416] Fix flaky tests by ensuring test utilities 
produce records with consistency and cleanup notify no more splits to ensure it 
is sent. This closes #79
cdfa328b is described below

commit cdfa328b5ec34d711ae2c9e93de6de7565fd1db6
Author: Mason Chen <mas.c...@berkeley.edu>
AuthorDate: Wed Jan 17 12:55:29 2024 -0800

    [FLINK-32416] Fix flaky tests by ensuring test utilities produce records 
with consistency and cleanup notify no more splits to ensure it is sent. This 
closes #79
---
 .../enumerator/DynamicKafkaSourceEnumerator.java   | 49 ++++++++++++----------
 .../enumerator/StoppableKafkaEnumContextProxy.java | 29 ++++++++++---
 .../source/reader/DynamicKafkaSourceReader.java    |  4 +-
 .../DynamicKafkaSourceEnumeratorTest.java          |  6 ++-
 .../StoppableKafkaEnumContextProxyTest.java        |  3 +-
 .../kafka/DynamicKafkaSourceTestHelper.java        |  4 +-
 6 files changed, 63 insertions(+), 32 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
index cce8ab28..e14a36d9 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
@@ -96,6 +96,7 @@ public class DynamicKafkaSourceEnumerator
     private int kafkaMetadataServiceDiscoveryFailureCount;
     private Map<String, Set<String>> latestClusterTopicsMap;
     private Set<KafkaStream> latestKafkaStreams;
+    private boolean firstDiscoveryComplete;
 
     public DynamicKafkaSourceEnumerator(
             KafkaStreamSubscriber kafkaStreamSubscriber,
@@ -151,6 +152,7 @@ public class DynamicKafkaSourceEnumerator
                         
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD,
                         Integer::parseInt);
         this.kafkaMetadataServiceDiscoveryFailureCount = 0;
+        this.firstDiscoveryComplete = false;
 
         this.kafkaMetadataService = kafkaMetadataService;
         this.stoppableKafkaEnumContextProxyFactory = 
stoppableKafkaEnumContextProxyFactory;
@@ -212,32 +214,27 @@ public class DynamicKafkaSourceEnumerator
 
     private void handleNoMoreSplits() {
         if (Boundedness.BOUNDED.equals(boundedness)) {
-            enumContext.runInCoordinatorThread(
-                    () -> {
-                        boolean allEnumeratorsHaveSignalledNoMoreSplits = true;
-                        for (StoppableKafkaEnumContextProxy context :
-                                clusterEnumContextMap.values()) {
-                            allEnumeratorsHaveSignalledNoMoreSplits =
-                                    allEnumeratorsHaveSignalledNoMoreSplits
-                                            && context.isNoMoreSplits();
-                        }
-
-                        if (allEnumeratorsHaveSignalledNoMoreSplits) {
-                            logger.info(
-                                    "Signal no more splits to all readers: {}",
-                                    enumContext.registeredReaders().keySet());
-                            enumContext
-                                    .registeredReaders()
-                                    .keySet()
-                                    .forEach(enumContext::signalNoMoreSplits);
-                        }
-                    });
+            boolean allEnumeratorsHaveSignalledNoMoreSplits = true;
+            for (StoppableKafkaEnumContextProxy context : 
clusterEnumContextMap.values()) {
+                allEnumeratorsHaveSignalledNoMoreSplits =
+                        allEnumeratorsHaveSignalledNoMoreSplits && 
context.isNoMoreSplits();
+            }
+
+            if (firstDiscoveryComplete && 
allEnumeratorsHaveSignalledNoMoreSplits) {
+                logger.info(
+                        "Signal no more splits to all readers: {}",
+                        enumContext.registeredReaders().keySet());
+                
enumContext.registeredReaders().keySet().forEach(enumContext::signalNoMoreSplits);
+            } else {
+                logger.info("Not ready to notify no more splits to readers.");
+            }
         }
     }
 
     // --------------- private methods for metadata discovery ---------------
 
     private void onHandleSubscribedStreamsFetch(Set<KafkaStream> 
fetchedKafkaStreams, Throwable t) {
+        firstDiscoveryComplete = true;
         Set<KafkaStream> handledFetchKafkaStreams =
                 handleFetchSubscribedStreamsError(fetchedKafkaStreams, t);
 
@@ -370,9 +367,19 @@ public class DynamicKafkaSourceEnumerator
             Set<String> topics,
             KafkaSourceEnumState kafkaSourceEnumState,
             Properties fetchedProperties) {
+        final Runnable signalNoMoreSplitsCallback;
+        if (Boundedness.BOUNDED.equals(boundedness)) {
+            signalNoMoreSplitsCallback = this::handleNoMoreSplits;
+        } else {
+            signalNoMoreSplitsCallback = null;
+        }
+
         StoppableKafkaEnumContextProxy context =
                 stoppableKafkaEnumContextProxyFactory.create(
-                        enumContext, kafkaClusterId, kafkaMetadataService);
+                        enumContext,
+                        kafkaClusterId,
+                        kafkaMetadataService,
+                        signalNoMoreSplitsCallback);
 
         Properties consumerProps = new Properties();
         KafkaPropertiesUtil.copyProperties(fetchedProperties, consumerProps);
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java
index 5506c444..752a5d6b 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java
@@ -34,6 +34,8 @@ import org.apache.kafka.common.KafkaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -69,6 +71,7 @@ public class StoppableKafkaEnumContextProxy
     private final KafkaMetadataService kafkaMetadataService;
     private final SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
     private final ScheduledExecutorService subEnumeratorWorker;
+    private final Runnable signalNoMoreSplitsCallback;
     private boolean noMoreSplits = false;
     private volatile boolean isClosing;
 
@@ -79,17 +82,20 @@ public class StoppableKafkaEnumContextProxy
      *     KafkaSourceEnumerator
      * @param kafkaMetadataService the Kafka metadata service to facilitate 
error handling
      * @param enumContext the underlying enumerator context
+     * @param signalNoMoreSplitsCallback the callback when signal no more 
splits is invoked
      */
     public StoppableKafkaEnumContextProxy(
             String kafkaClusterId,
             KafkaMetadataService kafkaMetadataService,
-            SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext) {
+            SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
+            @Nullable Runnable signalNoMoreSplitsCallback) {
         this.kafkaClusterId = kafkaClusterId;
         this.kafkaMetadataService = kafkaMetadataService;
         this.enumContext = enumContext;
         this.subEnumeratorWorker =
                 Executors.newScheduledThreadPool(
                         1, new ExecutorThreadFactory(kafkaClusterId + 
"-enum-worker"));
+        this.signalNoMoreSplitsCallback = signalNoMoreSplitsCallback;
         this.isClosing = false;
     }
 
@@ -147,8 +153,14 @@ public class StoppableKafkaEnumContextProxy
 
     @Override
     public void signalNoMoreSplits(int subtask) {
-        // there are no more splits for this cluster
+        // There are no more splits for this cluster, but we need to wait 
until all clusters are
+        // finished with their respective split discoveries. In the Kafka 
Source, this is called in
+        // the coordinator thread, ensuring thread safety, for all source 
readers at the same time.
         noMoreSplits = true;
+        if (signalNoMoreSplitsCallback != null) {
+            // Thread safe idempotent callback
+            signalNoMoreSplitsCallback.run();
+        }
     }
 
     /** Execute the one time callables in the coordinator. */
@@ -286,12 +298,19 @@ public class StoppableKafkaEnumContextProxy
         StoppableKafkaEnumContextProxy create(
                 SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
                 String kafkaClusterId,
-                KafkaMetadataService kafkaMetadataService);
+                KafkaMetadataService kafkaMetadataService,
+                Runnable signalNoMoreSplitsCallback);
 
         static StoppableKafkaEnumContextProxyFactory getDefaultFactory() {
-            return (enumContext, kafkaClusterId, kafkaMetadataService) ->
+            return (enumContext,
+                    kafkaClusterId,
+                    kafkaMetadataService,
+                    signalNoMoreSplitsCallback) ->
                     new StoppableKafkaEnumContextProxy(
-                            kafkaClusterId, kafkaMetadataService, enumContext);
+                            kafkaClusterId,
+                            kafkaMetadataService,
+                            enumContext,
+                            signalNoMoreSplitsCallback);
         }
     }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
index f7193418..4f307e11 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
@@ -132,8 +132,8 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
 
     @Override
     public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception 
{
-        // do not return end of input if no more splits has not yet been 
signaled
-        if (!isNoMoreSplits && clusterReaderMap.isEmpty()) {
+        // at startup, do not return end of input if metadata event has not 
been received
+        if (clusterReaderMap.isEmpty()) {
             return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
         }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
index 05046d40..3c3a76e8 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
@@ -919,11 +919,13 @@ public class DynamicKafkaSourceEnumeratorTest {
 
     private static class TestKafkaEnumContextProxyFactory
             implements 
StoppableKafkaEnumContextProxy.StoppableKafkaEnumContextProxyFactory {
+
         @Override
         public StoppableKafkaEnumContextProxy create(
                 SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
                 String kafkaClusterId,
-                KafkaMetadataService kafkaMetadataService) {
+                KafkaMetadataService kafkaMetadataService,
+                Runnable signalNoMoreSplitsCallback) {
             return new TestKafkaEnumContextProxy(
                     kafkaClusterId,
                     kafkaMetadataService,
@@ -939,7 +941,7 @@ public class DynamicKafkaSourceEnumeratorTest {
                 String kafkaClusterId,
                 KafkaMetadataService kafkaMetadataService,
                 MockSplitEnumeratorContext<DynamicKafkaSourceSplit> 
enumContext) {
-            super(kafkaClusterId, kafkaMetadataService, enumContext);
+            super(kafkaClusterId, kafkaMetadataService, enumContext, null);
             this.enumContext = enumContext;
         }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java
index 694c95c6..e3dbf4fd 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java
@@ -150,7 +150,8 @@ public class StoppableKafkaEnumContextProxyTest {
         return new StoppableKafkaEnumContextProxy(
                 contextKafkaCluster,
                 new 
MockKafkaMetadataService(Collections.singleton(mockStream)),
-                enumContext);
+                enumContext,
+                null);
     }
 
     // this modeled after `KafkaSourceEnumerator` topic partition subscription 
to throw the same
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java
index c6ecfd06..0ec02cc0 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java
@@ -204,6 +204,7 @@ public class DynamicKafkaSourceTestHelper extends 
KafkaTestBase {
             throws Throwable {
         Properties props = new Properties();
         props.putAll(clusterProperties);
+        props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
         props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
keySerializerClass.getName());
         props.setProperty(
                 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializerClass.getName());
@@ -219,8 +220,9 @@ public class DynamicKafkaSourceTestHelper extends 
KafkaTestBase {
                 };
         try (KafkaProducer<K, V> producer = new KafkaProducer<>(props)) {
             for (ProducerRecord<K, V> record : records) {
-                producer.send(record, callback).get();
+                producer.send(record, callback);
             }
+            producer.flush();
         }
         if (sendingError.get() != null) {
             throw sendingError.get();

Reply via email to