This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new cdfa328b [FLINK-32416] Fix flaky tests by ensuring test utilities
produce records with consistency and cleanup notify no more splits to ensure it
is sent. This closes #79
cdfa328b is described below
commit cdfa328b5ec34d711ae2c9e93de6de7565fd1db6
Author: Mason Chen <[email protected]>
AuthorDate: Wed Jan 17 12:55:29 2024 -0800
[FLINK-32416] Fix flaky tests by ensuring test utilities produce records
with consistency and cleanup notify no more splits to ensure it is sent. This
closes #79
---
.../enumerator/DynamicKafkaSourceEnumerator.java | 49 ++++++++++++----------
.../enumerator/StoppableKafkaEnumContextProxy.java | 29 ++++++++++---
.../source/reader/DynamicKafkaSourceReader.java | 4 +-
.../DynamicKafkaSourceEnumeratorTest.java | 6 ++-
.../StoppableKafkaEnumContextProxyTest.java | 3 +-
.../kafka/DynamicKafkaSourceTestHelper.java | 4 +-
6 files changed, 63 insertions(+), 32 deletions(-)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
index cce8ab28..e14a36d9 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
@@ -96,6 +96,7 @@ public class DynamicKafkaSourceEnumerator
private int kafkaMetadataServiceDiscoveryFailureCount;
private Map<String, Set<String>> latestClusterTopicsMap;
private Set<KafkaStream> latestKafkaStreams;
+ private boolean firstDiscoveryComplete;
public DynamicKafkaSourceEnumerator(
KafkaStreamSubscriber kafkaStreamSubscriber,
@@ -151,6 +152,7 @@ public class DynamicKafkaSourceEnumerator
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD,
Integer::parseInt);
this.kafkaMetadataServiceDiscoveryFailureCount = 0;
+ this.firstDiscoveryComplete = false;
this.kafkaMetadataService = kafkaMetadataService;
this.stoppableKafkaEnumContextProxyFactory =
stoppableKafkaEnumContextProxyFactory;
@@ -212,32 +214,27 @@ public class DynamicKafkaSourceEnumerator
private void handleNoMoreSplits() {
if (Boundedness.BOUNDED.equals(boundedness)) {
- enumContext.runInCoordinatorThread(
- () -> {
- boolean allEnumeratorsHaveSignalledNoMoreSplits = true;
- for (StoppableKafkaEnumContextProxy context :
- clusterEnumContextMap.values()) {
- allEnumeratorsHaveSignalledNoMoreSplits =
- allEnumeratorsHaveSignalledNoMoreSplits
- && context.isNoMoreSplits();
- }
-
- if (allEnumeratorsHaveSignalledNoMoreSplits) {
- logger.info(
- "Signal no more splits to all readers: {}",
- enumContext.registeredReaders().keySet());
- enumContext
- .registeredReaders()
- .keySet()
- .forEach(enumContext::signalNoMoreSplits);
- }
- });
+ boolean allEnumeratorsHaveSignalledNoMoreSplits = true;
+ for (StoppableKafkaEnumContextProxy context :
clusterEnumContextMap.values()) {
+ allEnumeratorsHaveSignalledNoMoreSplits =
+ allEnumeratorsHaveSignalledNoMoreSplits &&
context.isNoMoreSplits();
+ }
+
+ if (firstDiscoveryComplete &&
allEnumeratorsHaveSignalledNoMoreSplits) {
+ logger.info(
+ "Signal no more splits to all readers: {}",
+ enumContext.registeredReaders().keySet());
+
enumContext.registeredReaders().keySet().forEach(enumContext::signalNoMoreSplits);
+ } else {
+ logger.info("Not ready to notify no more splits to readers.");
+ }
}
}
// --------------- private methods for metadata discovery ---------------
private void onHandleSubscribedStreamsFetch(Set<KafkaStream>
fetchedKafkaStreams, Throwable t) {
+ firstDiscoveryComplete = true;
Set<KafkaStream> handledFetchKafkaStreams =
handleFetchSubscribedStreamsError(fetchedKafkaStreams, t);
@@ -370,9 +367,19 @@ public class DynamicKafkaSourceEnumerator
Set<String> topics,
KafkaSourceEnumState kafkaSourceEnumState,
Properties fetchedProperties) {
+ final Runnable signalNoMoreSplitsCallback;
+ if (Boundedness.BOUNDED.equals(boundedness)) {
+ signalNoMoreSplitsCallback = this::handleNoMoreSplits;
+ } else {
+ signalNoMoreSplitsCallback = null;
+ }
+
StoppableKafkaEnumContextProxy context =
stoppableKafkaEnumContextProxyFactory.create(
- enumContext, kafkaClusterId, kafkaMetadataService);
+ enumContext,
+ kafkaClusterId,
+ kafkaMetadataService,
+ signalNoMoreSplitsCallback);
Properties consumerProps = new Properties();
KafkaPropertiesUtil.copyProperties(fetchedProperties, consumerProps);
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java
index 5506c444..752a5d6b 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java
@@ -34,6 +34,8 @@ import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -69,6 +71,7 @@ public class StoppableKafkaEnumContextProxy
private final KafkaMetadataService kafkaMetadataService;
private final SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
private final ScheduledExecutorService subEnumeratorWorker;
+ private final Runnable signalNoMoreSplitsCallback;
private boolean noMoreSplits = false;
private volatile boolean isClosing;
@@ -79,17 +82,20 @@ public class StoppableKafkaEnumContextProxy
* KafkaSourceEnumerator
* @param kafkaMetadataService the Kafka metadata service to facilitate
error handling
* @param enumContext the underlying enumerator context
+ * @param signalNoMoreSplitsCallback the callback when signal no more
splits is invoked
*/
public StoppableKafkaEnumContextProxy(
String kafkaClusterId,
KafkaMetadataService kafkaMetadataService,
- SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext) {
+ SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
+ @Nullable Runnable signalNoMoreSplitsCallback) {
this.kafkaClusterId = kafkaClusterId;
this.kafkaMetadataService = kafkaMetadataService;
this.enumContext = enumContext;
this.subEnumeratorWorker =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory(kafkaClusterId +
"-enum-worker"));
+ this.signalNoMoreSplitsCallback = signalNoMoreSplitsCallback;
this.isClosing = false;
}
@@ -147,8 +153,14 @@ public class StoppableKafkaEnumContextProxy
@Override
public void signalNoMoreSplits(int subtask) {
- // there are no more splits for this cluster
+ // There are no more splits for this cluster, but we need to wait
until all clusters are
+ // finished with their respective split discoveries. In the Kafka
Source, this is called in
+ // the coordinator thread, ensuring thread safety, for all source
readers at the same time.
noMoreSplits = true;
+ if (signalNoMoreSplitsCallback != null) {
+ // Thread safe idempotent callback
+ signalNoMoreSplitsCallback.run();
+ }
}
/** Execute the one time callables in the coordinator. */
@@ -286,12 +298,19 @@ public class StoppableKafkaEnumContextProxy
StoppableKafkaEnumContextProxy create(
SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
String kafkaClusterId,
- KafkaMetadataService kafkaMetadataService);
+ KafkaMetadataService kafkaMetadataService,
+ Runnable signalNoMoreSplitsCallback);
static StoppableKafkaEnumContextProxyFactory getDefaultFactory() {
- return (enumContext, kafkaClusterId, kafkaMetadataService) ->
+ return (enumContext,
+ kafkaClusterId,
+ kafkaMetadataService,
+ signalNoMoreSplitsCallback) ->
new StoppableKafkaEnumContextProxy(
- kafkaClusterId, kafkaMetadataService, enumContext);
+ kafkaClusterId,
+ kafkaMetadataService,
+ enumContext,
+ signalNoMoreSplitsCallback);
}
}
}
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
index f7193418..4f307e11 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
@@ -132,8 +132,8 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
@Override
public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception
{
- // do not return end of input if no more splits has not yet been
signaled
- if (!isNoMoreSplits && clusterReaderMap.isEmpty()) {
+ // at startup, do not return end of input if metadata event has not
been received
+ if (clusterReaderMap.isEmpty()) {
return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
}
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
index 05046d40..3c3a76e8 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumeratorTest.java
@@ -919,11 +919,13 @@ public class DynamicKafkaSourceEnumeratorTest {
private static class TestKafkaEnumContextProxyFactory
implements
StoppableKafkaEnumContextProxy.StoppableKafkaEnumContextProxyFactory {
+
@Override
public StoppableKafkaEnumContextProxy create(
SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext,
String kafkaClusterId,
- KafkaMetadataService kafkaMetadataService) {
+ KafkaMetadataService kafkaMetadataService,
+ Runnable signalNoMoreSplitsCallback) {
return new TestKafkaEnumContextProxy(
kafkaClusterId,
kafkaMetadataService,
@@ -939,7 +941,7 @@ public class DynamicKafkaSourceEnumeratorTest {
String kafkaClusterId,
KafkaMetadataService kafkaMetadataService,
MockSplitEnumeratorContext<DynamicKafkaSourceSplit>
enumContext) {
- super(kafkaClusterId, kafkaMetadataService, enumContext);
+ super(kafkaClusterId, kafkaMetadataService, enumContext, null);
this.enumContext = enumContext;
}
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java
index 694c95c6..e3dbf4fd 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxyTest.java
@@ -150,7 +150,8 @@ public class StoppableKafkaEnumContextProxyTest {
return new StoppableKafkaEnumContextProxy(
contextKafkaCluster,
new
MockKafkaMetadataService(Collections.singleton(mockStream)),
- enumContext);
+ enumContext,
+ null);
}
// this modeled after `KafkaSourceEnumerator` topic partition subscription
to throw the same
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java
index c6ecfd06..0ec02cc0 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/DynamicKafkaSourceTestHelper.java
@@ -204,6 +204,7 @@ public class DynamicKafkaSourceTestHelper extends
KafkaTestBase {
throws Throwable {
Properties props = new Properties();
props.putAll(clusterProperties);
+ props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
keySerializerClass.getName());
props.setProperty(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializerClass.getName());
@@ -219,8 +220,9 @@ public class DynamicKafkaSourceTestHelper extends
KafkaTestBase {
};
try (KafkaProducer<K, V> producer = new KafkaProducer<>(props)) {
for (ProducerRecord<K, V> record : records) {
- producer.send(record, callback).get();
+ producer.send(record, callback);
}
+ producer.flush();
}
if (sendingError.get() != null) {
throw sendingError.get();