This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a2577b8010 KAFKA-17395 Flaky test testMissingOffsetNoResetPolicy for 
new consumer (#17056)
4a2577b8010 is described below

commit 4a2577b80102145a3b82886ee3c02d47b8ac791c
Author: PoAn Yang <[email protected]>
AuthorDate: Sat Aug 31 23:57:19 2024 +0800

    KAFKA-17395 Flaky test testMissingOffsetNoResetPolicy for new consumer 
(#17056)
    
    In AsyncKafkaConsumer, FindCoordinatorRequest is sent by background thread. 
In MockClient#prepareResponseFrom, it only matches the response to a future 
request. If there is some race condition, FindCoordinatorResponse may not match 
to a FindCoordinatorRequest. It's better to put MockClient#prepareResponseFrom 
before the request to avoid flaky test.
    
    Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/clients/consumer/KafkaConsumerTest.java   | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index bff8f970d6d..b2420f29bda 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -924,16 +924,14 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor,
-                true, groupId, groupInstanceId, false);
-        consumer.assign(singletonList(tp0));
-
         
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
-
-        // lookup committed offset and find nothing
         
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), 
Errors.NONE), coordinator);
 
+        consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor,
+                true, groupId, groupInstanceId, false);
+        consumer.assign(singletonList(tp0));
+
         if (groupProtocol == GroupProtocol.CONSUMER) {
             // New consumer poll(ZERO) needs to wait for the offset fetch 
event added by a call to poll, to be processed
             // by the background thread, so it can realize there are no 
committed offsets and then

Reply via email to