This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2cf1ef99d7a KAFKA-17129 Revisit FindCoordinatorResponse in
KafkaConsumerTest (#16589)
2cf1ef99d7a is described below
commit 2cf1ef99d7a76b4d53ec0d1a2f5a7be181274456
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Jul 15 08:17:53 2024 +0800
KAFKA-17129 Revisit FindCoordinatorResponse in KafkaConsumerTest (#16589)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/consumer/KafkaConsumerTest.java | 42 +++++++++-------------
1 file changed, 16 insertions(+), 26 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ee649ac800c..5510e9922da 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -750,12 +750,12 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
+
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic),
getConsumerRebalanceListener(consumer));
// Since we would enable the heartbeat thread after received
join-response which could
// send the sync-group on behalf of the consumer if it is enqueued, we
may still complete
// the rebalance and send out the fetch; in order to avoid it we do
not prepare sync response here.
-
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1,
memberId, leaderId, Errors.NONE), coordinator);
@@ -938,10 +938,8 @@ public class KafkaConsumerTest {
assertThrows(NoOffsetForPartitionException.class, () ->
consumer.poll(Duration.ZERO));
}
- // TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
- // The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
- @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ @EnumSource(GroupProtocol.class)
public void testResetToCommittedOffset(GroupProtocol groupProtocol) {
SubscriptionState subscription = new SubscriptionState(new
LogContext(), OffsetResetStrategy.NONE);
ConsumerMetadata metadata = createMetadata(subscription);
@@ -950,11 +948,11 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
+
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol,
time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId, false);
consumer.assign(singletonList(tp0));
-
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 539L),
Errors.NONE), coordinator);
@@ -973,11 +971,11 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
+
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor,
true, groupId, groupInstanceId, false);
consumer.assign(singletonList(tp0));
-
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L),
Errors.NONE), coordinator);
@@ -1005,10 +1003,8 @@ public class KafkaConsumerTest {
assertEquals(subscription.validPosition(tp0).offset, 20L);
}
- // TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
- // The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
- @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ @EnumSource(GroupProtocol.class)
public void testCommitsFetchedDuringAssign(GroupProtocol groupProtocol) {
long offset1 = 10000;
long offset2 = 20000;
@@ -1019,11 +1015,11 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
+
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, true, groupInstanceId);
consumer.assign(singletonList(tp0));
// lookup coordinator
-
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
// fetch offset for one topic
@@ -1044,10 +1040,8 @@ public class KafkaConsumerTest {
assertEquals(offset2,
consumer.committed(Collections.singleton(tp1)).get(tp1).offset());
}
- // TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
- // The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
- @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ @EnumSource(GroupProtocol.class)
public void testFetchStableOffsetThrowInCommitted(GroupProtocol
groupProtocol) {
assertThrows(UnsupportedVersionException.class, () ->
setupThrowableConsumer(groupProtocol).committed(Collections.singleton(tp0)));
}
@@ -1060,10 +1054,8 @@ public class KafkaConsumerTest {
assertThrows(UnsupportedVersionException.class, () ->
setupThrowableConsumer(groupProtocol).poll(Duration.ZERO));
}
- // TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
- // The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
- @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ @EnumSource(GroupProtocol.class)
public void testFetchStableOffsetThrowInPosition(GroupProtocol
groupProtocol) {
assertThrows(UnsupportedVersionException.class, () ->
setupThrowableConsumer(groupProtocol).position(tp0));
}
@@ -1079,11 +1071,11 @@ public class KafkaConsumerTest {
Node node = metadata.fetch().nodes().get(0);
+
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
consumer = newConsumer(
groupProtocol, time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId, true);
consumer.assign(singletonList(tp0));
-
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
client.prepareResponseFrom(offsetResponse(
@@ -1091,10 +1083,8 @@ public class KafkaConsumerTest {
return consumer;
}
- // TODO: this test triggers a bug with the CONSUMER group protocol
implementation.
- // The bug will be investigated and fixed so this test can use both
group protocols.
@ParameterizedTest
- @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ @EnumSource(GroupProtocol.class)
public void testNoCommittedOffsets(GroupProtocol groupProtocol) {
long offset1 = 10000;
@@ -1104,11 +1094,11 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
+
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, true, groupInstanceId);
consumer.assign(Arrays.asList(tp0, tp1));
// lookup coordinator
-
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
// fetch offset for one topic
@@ -1572,10 +1562,10 @@ public class KafkaConsumerTest {
ConsumerPartitionAssignor assignor = new RangeAssignor();
+
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, true, groupInstanceId);
// lookup coordinator
-
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
// manual assignment
@@ -1627,10 +1617,10 @@ public class KafkaConsumerTest {
ConsumerPartitionAssignor assignor = new RangeAssignor();
+
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, false, groupInstanceId);
// lookup coordinator
-
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
// manual assignment
@@ -1680,10 +1670,10 @@ public class KafkaConsumerTest {
ConsumerPartitionAssignor assignor = new RangeAssignor();
+
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, true, groupInstanceId);
// lookup coordinator
-
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
// manual assignment
@@ -3345,12 +3335,12 @@ public void
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
-
+
+
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, true, groupInstanceId);
consumer.assign(singletonList(tp0));
// lookup coordinator
-
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
// try to get committed offsets for one topic-partition - but it is
disconnected so there's no response and it will time out