This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff44f5e0a51 KAFKA-17554 Flaky testFutureCompletionOutsidePoll in 
ConsumerNetworkClientTest (#17217)
ff44f5e0a51 is described below

commit ff44f5e0a516fc68e1a2c237947f752142999732
Author: Ken Huang <[email protected]>
AuthorDate: Tue Dec 3 08:58:56 2024 +0800

    KAFKA-17554 Flaky testFutureCompletionOutsidePoll in 
ConsumerNetworkClientTest (#17217)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../internals/ConsumerNetworkClientTest.java       | 33 ++++++++++++++++------
 1 file changed, 24 insertions(+), 9 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index a21001d5102..fa2a1f19bdd 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -45,6 +45,7 @@ import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -268,26 +269,40 @@ public class ConsumerNetworkClientTest {
     public void testFutureCompletionOutsidePoll() throws Exception {
         // Tests the scenario in which the request that is being awaited in 
one thread
         // is received and completed in another thread.
+        
+        final CountDownLatch t1TheardCountDownLatch = new CountDownLatch(1);
+        final CountDownLatch t2ThreadCountDownLatch = new CountDownLatch(2);
 
         final RequestFuture<ClientResponse> future = consumerClient.send(node, 
heartbeat());
         consumerClient.pollNoWakeup(); // dequeue and send the request
 
         client.enableBlockingUntilWakeup(2);
-        Thread t1 = new Thread(() -> consumerClient.pollNoWakeup());
+        Thread t1 = new Thread(() ->  {
+            t1TheardCountDownLatch.countDown();
+            consumerClient.pollNoWakeup();
+            t2ThreadCountDownLatch.countDown();
+        });
+        
         t1.start();
 
-        // Sleep a little so that t1 is blocking in poll
-        Thread.sleep(50);
-
-        Thread t2 = new Thread(() -> consumerClient.poll(future));
+        Thread t2 = new Thread(() -> {
+            try {
+                t2ThreadCountDownLatch.await();
+                consumerClient.poll(future);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
         t2.start();
-
-        // Sleep a little so that t2 is awaiting the network client lock
-        Thread.sleep(50);
-
+        
         // Simulate a network response and return from the poll in t1
         client.respond(heartbeatResponse(Errors.NONE));
+        // Wait for t1 to block in poll
+        t1TheardCountDownLatch.await();
+        
         client.wakeup();
+        // while t1 is blocked in poll, t2 should be able to complete the 
future
+        t2ThreadCountDownLatch.countDown();
 
         // Both threads should complete since t1 should wakeup t2
         t1.join();

Reply via email to