This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 16e976ac8e6 KAFKA-20423: Fix flakiness of 
testWakeupWithFetchDataAvailable (#22364)
16e976ac8e6 is described below

commit 16e976ac8e6ff5bc79aaedb9ab6518bce1ea81bb
Author: ChickenchickenLove <[email protected]>
AuthorDate: Tue Jun 2 22:33:57 2026 +0900

    KAFKA-20423: Fix flakiness of testWakeupWithFetchDataAvailable (#22364)
    
    ### Description
    
    I couldn't reproduce it in my local.   This PR is based on the specific
    assumptions and fix non-deterministic path caused by it.
    
    1. The foreground test thread puts the fetch response into
    `MockClient.responses` via `respondFrom(...)`.
    2. The heartbeat thread enters `MockClient.poll()` first.
    3. The heartbeat thread removes the fetch response from the queue by
    calling `responses.poll()`.
    4. However, it gets descheduled before calling `response.onComplete()`.
    5. The foreground thread calls `client.poll(0, ...)`, but since the
    response queue is empty, it completes nothing and returns.
    6. `consumer.wakeup()` is called.
    7. The first `consumer.poll(Duration.ZERO)` immediately throws
    `WakeupException` from `maybeTriggerWakeup()`.
    8. `consumer.position(tp0)` already has a valid position of 0, so it
    returns 0 directly without performing a network poll.
    9. The next `consumer.poll(Duration.ZERO)` also returns empty because
    the response is not yet in `pendingCompletion` and not in the
    `FetchBuffer` either.
    10. After that, the heartbeat thread wakes up and calls
    `response.onComplete()`, but by then the assertion has already
    failed.
    
    Although I couldn't reproduce it in my local, this scenario make sense
    to me.   Given this, this PR applies the change, and I plan to monitor
    the CI trend afterward to confirm whether the flakiness is resolved.
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 27 +++++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 7dcd5826b32..c720ccc051c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
@@ -1509,7 +1510,21 @@ public class KafkaConsumerTest {
     @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
     public void testWakeupWithFetchDataAvailable(GroupProtocol groupProtocol) 
throws Exception {
         ConsumerMetadata metadata = createMetadata(subscription);
-        MockClient client = new MockClient(time, metadata);
+
+        AtomicInteger fetchCorrelationId = new AtomicInteger(-1);
+        AtomicBoolean fetchResponseCompleted = new AtomicBoolean(false);
+        MockClient client = new MockClient(time, metadata) {
+            @Override
+            public List<ClientResponse> poll(long timeoutMs, long now) {
+                List<ClientResponse> completed = super.poll(timeoutMs, now);
+                completed.stream()
+                        .filter(response -> response.requestHeader().apiKey() 
== ApiKeys.FETCH)
+                        .filter(response -> 
response.requestHeader().correlationId() == fetchCorrelationId.get())
+                        .findAny()
+                        .ifPresent(response -> 
fetchResponseCompleted.set(true));
+                return completed;
+            }
+        };
 
         initMetadata(client, Map.of(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
@@ -1522,9 +1537,15 @@ public class KafkaConsumerTest {
         consumer.poll(Duration.ZERO);
 
         // respond to the outstanding fetch so that we have data available on 
the next poll
-        client.respondFrom(fetchResponse(tp0, 0, 5), node);
-        client.poll(0, time.milliseconds());
+        ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH);
+        fetchCorrelationId.set(fetchRequest.correlationId());
 
+        client.respondFrom(fetchResponse(tp0, 0, 5), node);
+        TestUtils.waitForCondition(() -> {
+            client.poll(0, time.milliseconds());
+            return fetchResponseCompleted.get();
+        }, "Fetch response was not completed.");
+        
         consumer.wakeup();
 
         assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ZERO));

Reply via email to