This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff44f5e0a51 KAFKA-17554 Flaky testFutureCompletionOutsidePoll in
ConsumerNetworkClientTest (#17217)
ff44f5e0a51 is described below
commit ff44f5e0a516fc68e1a2c237947f752142999732
Author: Ken Huang <[email protected]>
AuthorDate: Tue Dec 3 08:58:56 2024 +0800
KAFKA-17554 Flaky testFutureCompletionOutsidePoll in
ConsumerNetworkClientTest (#17217)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../internals/ConsumerNetworkClientTest.java | 33 ++++++++++++++++------
1 file changed, 24 insertions(+), 9 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index a21001d5102..fa2a1f19bdd 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -45,6 +45,7 @@ import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -268,26 +269,40 @@ public class ConsumerNetworkClientTest {
public void testFutureCompletionOutsidePoll() throws Exception {
// Tests the scenario in which the request that is being awaited in
one thread
// is received and completed in another thread.
+
+ final CountDownLatch t1TheardCountDownLatch = new CountDownLatch(1);
+ final CountDownLatch t2ThreadCountDownLatch = new CountDownLatch(2);
final RequestFuture<ClientResponse> future = consumerClient.send(node,
heartbeat());
consumerClient.pollNoWakeup(); // dequeue and send the request
client.enableBlockingUntilWakeup(2);
- Thread t1 = new Thread(() -> consumerClient.pollNoWakeup());
+ Thread t1 = new Thread(() -> {
+ t1TheardCountDownLatch.countDown();
+ consumerClient.pollNoWakeup();
+ t2ThreadCountDownLatch.countDown();
+ });
+
t1.start();
- // Sleep a little so that t1 is blocking in poll
- Thread.sleep(50);
-
- Thread t2 = new Thread(() -> consumerClient.poll(future));
+ Thread t2 = new Thread(() -> {
+ try {
+ t2ThreadCountDownLatch.await();
+ consumerClient.poll(future);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
t2.start();
-
- // Sleep a little so that t2 is awaiting the network client lock
- Thread.sleep(50);
-
+
// Simulate a network response and return from the poll in t1
client.respond(heartbeatResponse(Errors.NONE));
+ // Wait for t1 to block in poll
+ t1TheardCountDownLatch.await();
+
client.wakeup();
+ // while t1 is blocked in poll, t2 should be able to complete the
future
+ t2ThreadCountDownLatch.countDown();
// Both threads should complete since t1 should wakeup t2
t1.join();