This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 16e976ac8e6 KAFKA-20423: Fix flakiness of
testWakeupWithFetchDataAvailable (#22364)
16e976ac8e6 is described below
commit 16e976ac8e6ff5bc79aaedb9ab6518bce1ea81bb
Author: ChickenchickenLove <[email protected]>
AuthorDate: Tue Jun 2 22:33:57 2026 +0900
KAFKA-20423: Fix flakiness of testWakeupWithFetchDataAvailable (#22364)
### Description
I couldn't reproduce it in my local. This PR is based on the specific
assumptions and fix non-deterministic path caused by it.
1. The foreground test thread puts the fetch response into
`MockClient.responses` via `respondFrom(...)`.
2. The heartbeat thread enters `MockClient.poll()` first.
3. The heartbeat thread removes the fetch response from the queue by
calling `responses.poll()`.
4. However, it gets descheduled before calling `response.onComplete()`.
5. The foreground thread calls `client.poll(0, ...)`, but since the
response queue is empty, it completes nothing and returns.
6. `consumer.wakeup()` is called.
7. The first `consumer.poll(Duration.ZERO)` immediately throws
`WakeupException` from `maybeTriggerWakeup()`.
8. `consumer.position(tp0)` already has a valid position of 0, so it
returns 0 directly without performing a network poll.
9. The next `consumer.poll(Duration.ZERO)` also returns empty because
the response is not yet in `pendingCompletion` and not in the
`FetchBuffer` either.
10. After that, the heartbeat thread wakes up and calls
`response.onComplete()`, but by then the assertion has already
failed.
Although I couldn't reproduce it in my local, this scenario make sense
to me. Given this, this PR applies the change, and I plan to monitor
the CI trend afterward to confirm whether the flakiness is resolved.
Reviewers: Lianet Magrans <[email protected]>
---
.../kafka/clients/consumer/KafkaConsumerTest.java | 27 +++++++++++++++++++---
1 file changed, 24 insertions(+), 3 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 7dcd5826b32..c720ccc051c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
@@ -1509,7 +1510,21 @@ public class KafkaConsumerTest {
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testWakeupWithFetchDataAvailable(GroupProtocol groupProtocol)
throws Exception {
ConsumerMetadata metadata = createMetadata(subscription);
- MockClient client = new MockClient(time, metadata);
+
+ AtomicInteger fetchCorrelationId = new AtomicInteger(-1);
+ AtomicBoolean fetchResponseCompleted = new AtomicBoolean(false);
+ MockClient client = new MockClient(time, metadata) {
+ @Override
+ public List<ClientResponse> poll(long timeoutMs, long now) {
+ List<ClientResponse> completed = super.poll(timeoutMs, now);
+ completed.stream()
+ .filter(response -> response.requestHeader().apiKey()
== ApiKeys.FETCH)
+ .filter(response ->
response.requestHeader().correlationId() == fetchCorrelationId.get())
+ .findAny()
+ .ifPresent(response ->
fetchResponseCompleted.set(true));
+ return completed;
+ }
+ };
initMetadata(client, Map.of(topic, 1));
Node node = metadata.fetch().nodes().get(0);
@@ -1522,9 +1537,15 @@ public class KafkaConsumerTest {
consumer.poll(Duration.ZERO);
// respond to the outstanding fetch so that we have data available on
the next poll
- client.respondFrom(fetchResponse(tp0, 0, 5), node);
- client.poll(0, time.milliseconds());
+ ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH);
+ fetchCorrelationId.set(fetchRequest.correlationId());
+ client.respondFrom(fetchResponse(tp0, 0, 5), node);
+ TestUtils.waitForCondition(() -> {
+ client.poll(0, time.milliseconds());
+ return fetchResponseCompleted.get();
+ }, "Fetch response was not completed.");
+
consumer.wakeup();
assertThrows(WakeupException.class, () ->
consumer.poll(Duration.ZERO));