This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 75075002b5d KAFKA-17106 enable
testFetchProgressWithMissingPartitionPosition for AsyncConsumer (#16564)
75075002b5d is described below
commit 75075002b5d51b651ae097f3018f5d5db3f1d202
Author: TaiJuWu <[email protected]>
AuthorDate: Sun Jul 14 09:04:20 2024 +0800
KAFKA-17106 enable testFetchProgressWithMissingPartitionPosition for
AsyncConsumer (#16564)
Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../org/apache/kafka/clients/consumer/KafkaConsumerTest.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 928f2fab869..83cb4a0c47c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -850,10 +850,8 @@ public class KafkaConsumerTest {
assertEquals(55, consumer.committed(Collections.singleton(tp0),
Duration.ZERO).get(tp0).offset());
}
- // TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
- // The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
- @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ @EnumSource(value = GroupProtocol.class)
public void testFetchProgressWithMissingPartitionPosition(GroupProtocol
groupProtocol) {
// Verifies that we can make progress on one partition while we are
awaiting
// a reset on another partition.
@@ -861,7 +859,11 @@ public class KafkaConsumerTest {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 2));
- Node node = metadata.fetch().nodes().get(0);
+
+ if (groupProtocol == GroupProtocol.CONSUMER) {
+ Node node = metadata.fetch().nodes().get(0);
+
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
+ }
consumer = newConsumerNoAutoCommit(groupProtocol, time, client,
subscription, metadata);
consumer.assign(Arrays.asList(tp0, tp1));