This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ebe7dd2b15c KAFKA-18418: Use CDL to block the thread termination to 
avoid flaky tests (#18418)
ebe7dd2b15c is described below

commit ebe7dd2b15c8cc13fc28a5182180ac3280f4f078
Author: Ao Li <[email protected]>
AuthorDate: Thu Jan 9 13:19:40 2025 -0500

    KAFKA-18418: Use CDL to block the thread termination to avoid flaky tests 
(#18418)
    
    This PR fixed a race-condition in KafkaStreamsTest, by replacing a 
non-deterministic
    sleep with a CountDownLatch to fully control how long a thread blocks.
    
    Reviewers:  David Arthur <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 71 ++++++++++++++++------
 1 file changed, 54 insertions(+), 17 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index ab35530abd1..1516dfb5eda 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -89,6 +89,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -145,7 +146,7 @@ public class KafkaStreamsTest {
     private Properties props;
     private MockAdminClient adminClient;
     private StateListenerStub streamsStateListener;
-    
+
     @Mock
     private StreamThread streamThreadOne;
     @Mock
@@ -344,13 +345,49 @@ public class KafkaStreamsTest {
         }).when(thread).start();
     }
 
+    private CountDownLatch terminableThreadBlockingLatch = new 
CountDownLatch(1);
+
     private void prepareTerminableThread(final StreamThread thread) throws 
InterruptedException {
         doAnswer(invocation -> {
-            Thread.sleep(2000L);
+            terminableThreadBlockingLatch.await();
             return null;
         }).when(thread).join();
     }
 
+    private class KafkaStreamsWithTerminableThread extends KafkaStreams {
+
+        KafkaStreamsWithTerminableThread(final Topology topology,
+                                         final Properties props,
+                                         final KafkaClientSupplier 
clientSupplier,
+                                         final Time time) {
+            super(topology, props, clientSupplier, time);
+        }
+
+
+        KafkaStreamsWithTerminableThread(final Topology topology,
+                                         final Properties props,
+                                         final KafkaClientSupplier 
clientSupplier) {
+            super(topology, props, clientSupplier);
+        }
+
+        KafkaStreamsWithTerminableThread(final Topology topology,
+                                         final StreamsConfig 
applicationConfigs) {
+            super(topology, applicationConfigs);
+        }
+
+        KafkaStreamsWithTerminableThread(final Topology topology,
+                                         final StreamsConfig 
applicationConfigs,
+                                         final KafkaClientSupplier 
clientSupplier) {
+            super(topology, applicationConfigs, clientSupplier);
+        }
+
+        @Override
+        public void close() {
+            terminableThreadBlockingLatch.countDown();
+            super.close();
+        }
+    }
+
     @Test
     public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
         prepareStreams();
@@ -947,7 +984,7 @@ public class KafkaStreamsTest {
         prepareThreadState(streamThreadOne, state1);
         prepareThreadState(streamThreadTwo, state2);
         prepareTerminableThread(streamThreadOne);
-        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+        try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
supplier, time)) {
             streams.start();
             waitForCondition(
                 () -> streams.state() == KafkaStreams.State.RUNNING,
@@ -972,7 +1009,7 @@ public class KafkaStreamsTest {
 
         when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
 
-        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier, time)) {
+        try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
mockClientSupplier, time)) {
             streams.start();
             waitForCondition(
                 () -> streams.state() == KafkaStreams.State.RUNNING,
@@ -997,7 +1034,7 @@ public class KafkaStreamsTest {
         prepareThreadState(streamThreadOne, state1);
         prepareThreadState(streamThreadTwo, state2);
         prepareTerminableThread(streamThreadOne);
-        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+        try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
supplier, time)) {
             streams.start();
             waitForCondition(
                 () -> streams.state() == KafkaStreams.State.RUNNING,
@@ -1154,7 +1191,7 @@ public class KafkaStreamsTest {
         prepareTerminableThread(streamThreadOne);
 
         // do not use mock time so that it can really elapse
-        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier)) {
+        try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
supplier)) {
             assertFalse(streams.close(Duration.ofMillis(10L)));
         }
     }
@@ -1166,7 +1203,7 @@ public class KafkaStreamsTest {
         prepareStreamThread(streamThreadTwo, 2);
         prepareTerminableThread(streamThreadOne);
 
-        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+        try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
supplier, time)) {
             assertThrows(IllegalArgumentException.class, () -> 
streams.close(Duration.ofMillis(-1L)));
         }
     }
@@ -1178,7 +1215,7 @@ public class KafkaStreamsTest {
         prepareStreamThread(streamThreadTwo, 2);
         prepareTerminableThread(streamThreadOne);
 
-        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+        try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
supplier, time)) {
             // with mock time that does not elapse, close would not return if 
it ever waits on the state transition
             assertFalse(streams.close(Duration.ZERO));
         }
@@ -1193,7 +1230,7 @@ public class KafkaStreamsTest {
 
         final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
         closeOptions.timeout(Duration.ofMillis(10L));
-        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier)) {
+        try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
supplier)) {
             assertFalse(streams.close(closeOptions));
         }
     }
@@ -1207,7 +1244,7 @@ public class KafkaStreamsTest {
 
         final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
         closeOptions.timeout(Duration.ofMillis(-1L));
-        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+        try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
supplier, time)) {
             assertThrows(IllegalArgumentException.class, () -> 
streams.close(closeOptions));
         }
     }
@@ -1221,7 +1258,7 @@ public class KafkaStreamsTest {
 
         final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
         closeOptions.timeout(Duration.ZERO);
-        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier)) {
+        try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
supplier)) {
             assertFalse(streams.close(closeOptions));
         }
     }
@@ -1240,7 +1277,7 @@ public class KafkaStreamsTest {
         final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
         closeOptions.timeout(Duration.ofMillis(10L));
         closeOptions.leaveGroup(true);
-        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier)) {
+        try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
mockClientSupplier)) {
             assertFalse(streams.close(closeOptions));
         }
     }
@@ -1258,7 +1295,7 @@ public class KafkaStreamsTest {
         final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
         closeOptions.timeout(Duration.ofMillis(-1L));
         closeOptions.leaveGroup(true);
-        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier, time)) {
+        try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
mockClientSupplier, time)) {
             assertThrows(IllegalArgumentException.class, () -> 
streams.close(closeOptions));
         }
     }
@@ -1277,7 +1314,7 @@ public class KafkaStreamsTest {
         final KafkaStreams.CloseOptions closeOptions = new 
KafkaStreams.CloseOptions();
         closeOptions.timeout(Duration.ZERO);
         closeOptions.leaveGroup(true);
-        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier)) {
+        try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
mockClientSupplier)) {
             assertFalse(streams.close(closeOptions));
         }
     }
@@ -1300,7 +1337,7 @@ public class KafkaStreamsTest {
             builder.table("topic", Materialized.as("store"));
             props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, 
RecordingLevel.DEBUG.name());
 
-            try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            try (final KafkaStreams streams = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, 
supplier, time)) {
                 streams.start();
             }
 
@@ -1324,7 +1361,7 @@ public class KafkaStreamsTest {
         final StreamsConfig mockConfig = spy(config);
         when(mockConfig.getKafkaClientSupplier()).thenReturn(supplier);
 
-        try (final KafkaStreams ignored = new 
KafkaStreams(getBuilderWithSource().build(), mockConfig)) {
+        try (final KafkaStreams ignored = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), mockConfig)) {
             // no-op
         }
         // It's called once in above when mock
@@ -1361,7 +1398,7 @@ public class KafkaStreamsTest {
         final StreamsConfig config = new StreamsConfig(props);
         final StreamsConfig mockConfig = spy(config);
 
-        try (final KafkaStreams ignored = new 
KafkaStreams(getBuilderWithSource().build(), mockConfig, supplier)) {
+        try (final KafkaStreams ignored = new 
KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), mockConfig, 
supplier)) {
             // no-op
         }
         // It's called once in above when mock

Reply via email to