This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ebe7dd2b15c KAFKA-18418: Use CDL to block the thread termination to
avoid flaky tests (#18418)
ebe7dd2b15c is described below
commit ebe7dd2b15c8cc13fc28a5182180ac3280f4f078
Author: Ao Li <[email protected]>
AuthorDate: Thu Jan 9 13:19:40 2025 -0500
KAFKA-18418: Use CDL to block the thread termination to avoid flaky tests
(#18418)
This PR fixed a race-condition in KafkaStreamsTest, by replacing a
non-deterministic
sleep with a CountDownLatch to fully control how long a thread blocks.
Reviewers: David Arthur <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../org/apache/kafka/streams/KafkaStreamsTest.java | 71 ++++++++++++++++------
1 file changed, 54 insertions(+), 17 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index ab35530abd1..1516dfb5eda 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -89,6 +89,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -145,7 +146,7 @@ public class KafkaStreamsTest {
private Properties props;
private MockAdminClient adminClient;
private StateListenerStub streamsStateListener;
-
+
@Mock
private StreamThread streamThreadOne;
@Mock
@@ -344,13 +345,49 @@ public class KafkaStreamsTest {
}).when(thread).start();
}
+ private CountDownLatch terminableThreadBlockingLatch = new
CountDownLatch(1);
+
private void prepareTerminableThread(final StreamThread thread) throws
InterruptedException {
doAnswer(invocation -> {
- Thread.sleep(2000L);
+ terminableThreadBlockingLatch.await();
return null;
}).when(thread).join();
}
+ private class KafkaStreamsWithTerminableThread extends KafkaStreams {
+
+ KafkaStreamsWithTerminableThread(final Topology topology,
+ final Properties props,
+ final KafkaClientSupplier
clientSupplier,
+ final Time time) {
+ super(topology, props, clientSupplier, time);
+ }
+
+
+ KafkaStreamsWithTerminableThread(final Topology topology,
+ final Properties props,
+ final KafkaClientSupplier
clientSupplier) {
+ super(topology, props, clientSupplier);
+ }
+
+ KafkaStreamsWithTerminableThread(final Topology topology,
+ final StreamsConfig
applicationConfigs) {
+ super(topology, applicationConfigs);
+ }
+
+ KafkaStreamsWithTerminableThread(final Topology topology,
+ final StreamsConfig
applicationConfigs,
+ final KafkaClientSupplier
clientSupplier) {
+ super(topology, applicationConfigs, clientSupplier);
+ }
+
+ @Override
+ public void close() {
+ terminableThreadBlockingLatch.countDown();
+ super.close();
+ }
+ }
+
@Test
public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
prepareStreams();
@@ -947,7 +984,7 @@ public class KafkaStreamsTest {
prepareThreadState(streamThreadOne, state1);
prepareThreadState(streamThreadTwo, state2);
prepareTerminableThread(streamThreadOne);
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
supplier, time)) {
streams.start();
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
@@ -972,7 +1009,7 @@ public class KafkaStreamsTest {
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier, time)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
mockClientSupplier, time)) {
streams.start();
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
@@ -997,7 +1034,7 @@ public class KafkaStreamsTest {
prepareThreadState(streamThreadOne, state1);
prepareThreadState(streamThreadTwo, state2);
prepareTerminableThread(streamThreadOne);
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
supplier, time)) {
streams.start();
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
@@ -1154,7 +1191,7 @@ public class KafkaStreamsTest {
prepareTerminableThread(streamThreadOne);
// do not use mock time so that it can really elapse
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
supplier)) {
assertFalse(streams.close(Duration.ofMillis(10L)));
}
}
@@ -1166,7 +1203,7 @@ public class KafkaStreamsTest {
prepareStreamThread(streamThreadTwo, 2);
prepareTerminableThread(streamThreadOne);
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
supplier, time)) {
assertThrows(IllegalArgumentException.class, () ->
streams.close(Duration.ofMillis(-1L)));
}
}
@@ -1178,7 +1215,7 @@ public class KafkaStreamsTest {
prepareStreamThread(streamThreadTwo, 2);
prepareTerminableThread(streamThreadOne);
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
supplier, time)) {
// with mock time that does not elapse, close would not return if
it ever waits on the state transition
assertFalse(streams.close(Duration.ZERO));
}
@@ -1193,7 +1230,7 @@ public class KafkaStreamsTest {
final KafkaStreams.CloseOptions closeOptions = new
KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(10L));
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
supplier)) {
assertFalse(streams.close(closeOptions));
}
}
@@ -1207,7 +1244,7 @@ public class KafkaStreamsTest {
final KafkaStreams.CloseOptions closeOptions = new
KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(-1L));
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
supplier, time)) {
assertThrows(IllegalArgumentException.class, () ->
streams.close(closeOptions));
}
}
@@ -1221,7 +1258,7 @@ public class KafkaStreamsTest {
final KafkaStreams.CloseOptions closeOptions = new
KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
supplier)) {
assertFalse(streams.close(closeOptions));
}
}
@@ -1240,7 +1277,7 @@ public class KafkaStreamsTest {
final KafkaStreams.CloseOptions closeOptions = new
KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(10L));
closeOptions.leaveGroup(true);
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
mockClientSupplier)) {
assertFalse(streams.close(closeOptions));
}
}
@@ -1258,7 +1295,7 @@ public class KafkaStreamsTest {
final KafkaStreams.CloseOptions closeOptions = new
KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(-1L));
closeOptions.leaveGroup(true);
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier, time)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
mockClientSupplier, time)) {
assertThrows(IllegalArgumentException.class, () ->
streams.close(closeOptions));
}
}
@@ -1277,7 +1314,7 @@ public class KafkaStreamsTest {
final KafkaStreams.CloseOptions closeOptions = new
KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
closeOptions.leaveGroup(true);
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
mockClientSupplier)) {
assertFalse(streams.close(closeOptions));
}
}
@@ -1300,7 +1337,7 @@ public class KafkaStreamsTest {
builder.table("topic", Materialized.as("store"));
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG,
RecordingLevel.DEBUG.name());
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props,
supplier, time)) {
streams.start();
}
@@ -1324,7 +1361,7 @@ public class KafkaStreamsTest {
final StreamsConfig mockConfig = spy(config);
when(mockConfig.getKafkaClientSupplier()).thenReturn(supplier);
- try (final KafkaStreams ignored = new
KafkaStreams(getBuilderWithSource().build(), mockConfig)) {
+ try (final KafkaStreams ignored = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), mockConfig)) {
// no-op
}
// It's called once in above when mock
@@ -1361,7 +1398,7 @@ public class KafkaStreamsTest {
final StreamsConfig config = new StreamsConfig(props);
final StreamsConfig mockConfig = spy(config);
- try (final KafkaStreams ignored = new
KafkaStreams(getBuilderWithSource().build(), mockConfig, supplier)) {
+ try (final KafkaStreams ignored = new
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), mockConfig,
supplier)) {
// no-op
}
// It's called once in above when mock