This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2cf1ef99d7a KAFKA-17129 Revisit FindCoordinatorResponse in 
KafkaConsumerTest (#16589)
2cf1ef99d7a is described below

commit 2cf1ef99d7a76b4d53ec0d1a2f5a7be181274456
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Jul 15 08:17:53 2024 +0800

    KAFKA-17129 Revisit FindCoordinatorResponse in KafkaConsumerTest (#16589)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 42 +++++++++-------------
 1 file changed, 16 insertions(+), 26 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ee649ac800c..5510e9922da 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -750,12 +750,12 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
         // Since we would enable the heartbeat thread after received 
join-response which could
         // send the sync-group on behalf of the consumer if it is enqueued, we 
may still complete
         // the rebalance and send out the fetch; in order to avoid it we do 
not prepare sync response here.
-        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
         client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, 
memberId, leaderId, Errors.NONE), coordinator);
 
@@ -938,10 +938,8 @@ public class KafkaConsumerTest {
         assertThrows(NoOffsetForPartitionException.class, () -> 
consumer.poll(Duration.ZERO));
     }
 
-    // TODO: this test triggers a bug with the CONSUMER group protocol 
implementation.
-    //       The bug will be investigated and fixed so this test can use both 
group protocols.
     @ParameterizedTest
-    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    @EnumSource(GroupProtocol.class)
     public void testResetToCommittedOffset(GroupProtocol groupProtocol) {
         SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.NONE);
         ConsumerMetadata metadata = createMetadata(subscription);
@@ -950,11 +948,11 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, 
time, client, subscription, metadata, assignor,
                 true, groupId, groupInstanceId, false);
         consumer.assign(singletonList(tp0));
 
-        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
 
         
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 539L), 
Errors.NONE), coordinator);
@@ -973,11 +971,11 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor,
                 true, groupId, groupInstanceId, false);
         consumer.assign(singletonList(tp0));
 
-        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
 
         
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), 
Errors.NONE), coordinator);
@@ -1005,10 +1003,8 @@ public class KafkaConsumerTest {
         assertEquals(subscription.validPosition(tp0).offset, 20L);
     }
 
-    // TODO: this test triggers a bug with the CONSUMER group protocol 
implementation.
-    //       The bug will be investigated and fixed so this test can use both 
group protocols.
     @ParameterizedTest
-    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    @EnumSource(GroupProtocol.class)
     public void testCommitsFetchedDuringAssign(GroupProtocol groupProtocol) {
         long offset1 = 10000;
         long offset2 = 20000;
@@ -1019,11 +1015,11 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
 
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor, true, groupInstanceId);
         consumer.assign(singletonList(tp0));
 
         // lookup coordinator
-        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
 
         // fetch offset for one topic
@@ -1044,10 +1040,8 @@ public class KafkaConsumerTest {
         assertEquals(offset2, 
consumer.committed(Collections.singleton(tp1)).get(tp1).offset());
     }
 
-    // TODO: this test triggers a bug with the CONSUMER group protocol 
implementation.
-    //       The bug will be investigated and fixed so this test can use both 
group protocols.
     @ParameterizedTest
-    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    @EnumSource(GroupProtocol.class)
     public void testFetchStableOffsetThrowInCommitted(GroupProtocol 
groupProtocol) {
         assertThrows(UnsupportedVersionException.class, () -> 
setupThrowableConsumer(groupProtocol).committed(Collections.singleton(tp0)));
     }
@@ -1060,10 +1054,8 @@ public class KafkaConsumerTest {
         assertThrows(UnsupportedVersionException.class, () -> 
setupThrowableConsumer(groupProtocol).poll(Duration.ZERO));
     }
 
-    // TODO: this test triggers a bug with the CONSUMER group protocol 
implementation.
-    //       The bug will be investigated and fixed so this test can use both 
group protocols.
     @ParameterizedTest
-    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    @EnumSource(GroupProtocol.class)
     public void testFetchStableOffsetThrowInPosition(GroupProtocol 
groupProtocol) {
         assertThrows(UnsupportedVersionException.class, () -> 
setupThrowableConsumer(groupProtocol).position(tp0));
     }
@@ -1079,11 +1071,11 @@ public class KafkaConsumerTest {
 
         Node node = metadata.fetch().nodes().get(0);
 
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         consumer = newConsumer(
                 groupProtocol, time, client, subscription, metadata, assignor, 
true, groupId, groupInstanceId, true);
         consumer.assign(singletonList(tp0));
 
-        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
 
         client.prepareResponseFrom(offsetResponse(
@@ -1091,10 +1083,8 @@ public class KafkaConsumerTest {
         return consumer;
     }
 
-    // TODO: this test triggers a bug with the CONSUMER group protocol 
implementation.
-    //       The bug will be investigated and fixed so this test can use both 
group protocols.
     @ParameterizedTest
-    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    @EnumSource(GroupProtocol.class)
     public void testNoCommittedOffsets(GroupProtocol groupProtocol) {
         long offset1 = 10000;
 
@@ -1104,11 +1094,11 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
 
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor, true, groupInstanceId);
         consumer.assign(Arrays.asList(tp0, tp1));
 
         // lookup coordinator
-        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
 
         // fetch offset for one topic
@@ -1572,10 +1562,10 @@ public class KafkaConsumerTest {
 
         ConsumerPartitionAssignor assignor = new RangeAssignor();
 
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor, true, groupInstanceId);
 
         // lookup coordinator
-        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
 
         // manual assignment
@@ -1627,10 +1617,10 @@ public class KafkaConsumerTest {
 
         ConsumerPartitionAssignor assignor = new RangeAssignor();
 
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor, false, groupInstanceId);
 
         // lookup coordinator
-        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
 
         // manual assignment
@@ -1680,10 +1670,10 @@ public class KafkaConsumerTest {
 
         ConsumerPartitionAssignor assignor = new RangeAssignor();
 
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor, true, groupInstanceId);
 
         // lookup coordinator
-        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
 
         // manual assignment
@@ -3345,12 +3335,12 @@ public void 
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
         
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
-        
+
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor, true, groupInstanceId);
         consumer.assign(singletonList(tp0));
         
         // lookup coordinator
-        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
         
         // try to get committed offsets for one topic-partition - but it is 
disconnected so there's no response and it will time out

Reply via email to