This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8036e49a6e8 KAFKA-17554 Flaky testFutureCompletionOutsidePoll in
ConsumerNetworkClientTest (#18298)
8036e49a6e8 is described below
commit 8036e49a6e8ffe5bb1ab4fad88482853f615efce
Author: Ken Huang <[email protected]>
AuthorDate: Wed Sep 24 18:42:25 2025 +0800
KAFKA-17554 Flaky testFutureCompletionOutsidePoll in
ConsumerNetworkClientTest (#18298)
Jira: https://issues.apache.org/jira/browse/KAFKA-17554
In the previous workflow, the test passes under two conditions:
1. The `t1` thread is waiting for the main thread's `client.wakeup()`.
If successful, `t1` will wake up `t2`, allowing `t2` to complete the
future.
2. If `t1` fails to receive the `client.wakeup()` from the main thread,
`t2` will be woken up by the main thread.
In the previous implementation, we used a `CountDownLatch` to control
the execution of three threads, but it often led to race conditions.
Currently, we have modified it to use two threads to test this scenario.
I run `I=0; while ./gradlew :clients:test --tests
ConsumerNetworkClientTest.testFutureCompletionOutsidePoll --rerun
--fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done`
and pass 3000+ times.

Reviewers: Chia-Ping Tsai <[email protected]>
---
.../internals/ConsumerNetworkClientTest.java | 36 ++++++++--------------
1 file changed, 12 insertions(+), 24 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index b5ab39e62c7..1f5551e7df1 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -41,7 +41,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.time.Duration;
@@ -266,45 +265,34 @@ public class ConsumerNetworkClientTest {
assertEquals(metadataException, exc);
}
- @Disabled("KAFKA-17554")
@Test
public void testFutureCompletionOutsidePoll() throws Exception {
// Tests the scenario in which the request that is being awaited in
one thread
// is received and completed in another thread.
-
- final CountDownLatch t1TheardCountDownLatch = new CountDownLatch(1);
- final CountDownLatch t2ThreadCountDownLatch = new CountDownLatch(2);
-
final RequestFuture<ClientResponse> future = consumerClient.send(node,
heartbeat());
consumerClient.pollNoWakeup(); // dequeue and send the request
+ CountDownLatch bothThreadsReady = new CountDownLatch(2);
+
client.enableBlockingUntilWakeup(2);
- Thread t1 = new Thread(() -> {
- t1TheardCountDownLatch.countDown();
+
+ Thread t1 = new Thread(() -> {
+ bothThreadsReady.countDown();
consumerClient.pollNoWakeup();
- t2ThreadCountDownLatch.countDown();
});
-
- t1.start();
Thread t2 = new Thread(() -> {
- try {
- t2ThreadCountDownLatch.await();
- consumerClient.poll(future);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ bothThreadsReady.countDown();
+ consumerClient.poll(future);
});
+
+ t1.start();
t2.start();
-
- // Simulate a network response and return from the poll in t1
+
+ // Wait until both threads are blocked in poll
+ bothThreadsReady.await();
client.respond(heartbeatResponse(Errors.NONE));
- // Wait for t1 to block in poll
- t1TheardCountDownLatch.await();
-
client.wakeup();
- // while t1 is blocked in poll, t2 should be able to complete the
future
- t2ThreadCountDownLatch.countDown();
// Both threads should complete since t1 should wakeup t2
t1.join();