This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4a2577b8010 KAFKA-17395 Flaky test testMissingOffsetNoResetPolicy for
new consumer (#17056)
4a2577b8010 is described below
commit 4a2577b80102145a3b82886ee3c02d47b8ac791c
Author: PoAn Yang <[email protected]>
AuthorDate: Sat Aug 31 23:57:19 2024 +0800
KAFKA-17395 Flaky test testMissingOffsetNoResetPolicy for new consumer
(#17056)
In AsyncKafkaConsumer, FindCoordinatorRequest is sent by background thread.
In MockClient#prepareResponseFrom, it only matches the response to a future
request. If there is some race condition, FindCoordinatorResponse may not match
to a FindCoordinatorRequest. It's better to put MockClient#prepareResponseFrom
before the request to avoid flaky test.
Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/clients/consumer/KafkaConsumerTest.java | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index bff8f970d6d..b2420f29bda 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -924,16 +924,14 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor,
- true, groupId, groupInstanceId, false);
- consumer.assign(singletonList(tp0));
-
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
-
- // lookup committed offset and find nothing
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L),
Errors.NONE), coordinator);
+ consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor,
+ true, groupId, groupInstanceId, false);
+ consumer.assign(singletonList(tp0));
+
if (groupProtocol == GroupProtocol.CONSUMER) {
// New consumer poll(ZERO) needs to wait for the offset fetch
event added by a call to poll, to be processed
// by the background thread, so it can realize there are no
committed offsets and then