This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8036e49a6e8 KAFKA-17554 Flaky testFutureCompletionOutsidePoll in 
ConsumerNetworkClientTest (#18298)
8036e49a6e8 is described below

commit 8036e49a6e8ffe5bb1ab4fad88482853f615efce
Author: Ken Huang <[email protected]>
AuthorDate: Wed Sep 24 18:42:25 2025 +0800

    KAFKA-17554 Flaky testFutureCompletionOutsidePoll in 
ConsumerNetworkClientTest (#18298)
    
    Jira: https://issues.apache.org/jira/browse/KAFKA-17554
    
    In the previous workflow, the test passes under two conditions:
    
    1. The `t1` thread is waiting for the main thread's `client.wakeup()`.
    If successful, `t1` will wake up `t2`, allowing `t2` to complete the
    future.
    2. If `t1` fails to receive the `client.wakeup()` from the main thread,
    `t2` will be woken up by the main thread.
    
    In the previous implementation, we used a `CountDownLatch` to control
    the execution of three threads, but it often led to race conditions.
    Currently, we have modified it to use two threads to test this scenario.
    
    I run `I=0;  while ./gradlew :clients:test --tests
    ConsumerNetworkClientTest.testFutureCompletionOutsidePoll --rerun
    --fail-fast; do  (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done`
    and pass 3000+ times.
    
    
![image](https://github.com/user-attachments/assets/3b8d804e-fbe0-4030-8686-4960fc717d07)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../internals/ConsumerNetworkClientTest.java       | 36 ++++++++--------------
 1 file changed, 12 insertions(+), 24 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index b5ab39e62c7..1f5551e7df1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -41,7 +41,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
 
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
@@ -266,45 +265,34 @@ public class ConsumerNetworkClientTest {
         assertEquals(metadataException, exc);
     }
 
-    @Disabled("KAFKA-17554")
     @Test
     public void testFutureCompletionOutsidePoll() throws Exception {
         // Tests the scenario in which the request that is being awaited in 
one thread
         // is received and completed in another thread.
-        
-        final CountDownLatch t1TheardCountDownLatch = new CountDownLatch(1);
-        final CountDownLatch t2ThreadCountDownLatch = new CountDownLatch(2);
-
         final RequestFuture<ClientResponse> future = consumerClient.send(node, 
heartbeat());
         consumerClient.pollNoWakeup(); // dequeue and send the request
 
+        CountDownLatch bothThreadsReady = new CountDownLatch(2);
+
         client.enableBlockingUntilWakeup(2);
-        Thread t1 = new Thread(() ->  {
-            t1TheardCountDownLatch.countDown();
+
+        Thread t1 = new Thread(() -> {
+            bothThreadsReady.countDown();
             consumerClient.pollNoWakeup();
-            t2ThreadCountDownLatch.countDown();
         });
-        
-        t1.start();
 
         Thread t2 = new Thread(() -> {
-            try {
-                t2ThreadCountDownLatch.await();
-                consumerClient.poll(future);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
+            bothThreadsReady.countDown();
+            consumerClient.poll(future);
         });
+
+        t1.start();
         t2.start();
-        
-        // Simulate a network response and return from the poll in t1
+
+        // Wait until both threads are blocked in poll
+        bothThreadsReady.await();
         client.respond(heartbeatResponse(Errors.NONE));
-        // Wait for t1 to block in poll
-        t1TheardCountDownLatch.await();
-        
         client.wakeup();
-        // while t1 is blocked in poll, t2 should be able to complete the 
future
-        t2ThreadCountDownLatch.countDown();
 
         // Both threads should complete since t1 should wakeup t2
         t1.join();

Reply via email to