This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9ea0503e234 KAFKA-16566: Fix consumer static membership system test 
with new protocol (#15738)
9ea0503e234 is described below

commit 9ea0503e234368d25748a2d7677bd5a1f87bc058
Author: Lianet Magrans <98415067+lian...@users.noreply.github.com>
AuthorDate: Fri Apr 19 11:34:05 2024 +0200

    KAFKA-16566: Fix consumer static membership system test with new protocol 
(#15738)
    
    Updating consumer system test that was failing with the new protocol, 
related to static membership behaviour. The behaviour regarding static 
consumers that join with conflicting group instance id is slightly different 
between the classic and new consumer protocol, so the expectations in the tests 
needed to be updated.
    
    If static members join with same instance id:
    
    Classic protocol: all members join the group with the same group instance 
id, and then the first one will eventually fail (receives a HB error with 
FencedInstanceIdException)
    
    Consumer protocol: new member with an instance Id already in use is not 
able to join, and first member remains active (new member with same instance Id 
receives an UnreleasedInstanceIdException in the response to the HB to join the 
group)
    
    This PR is keeping the single parametrized test that existed before, given 
that what's being tested and part of the test itself apply to all protocols. 
This is just updating the expectations that are different, based on the 
protocol parameter.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>, Kirk True 
<kt...@confluent.io>
---
 tests/kafkatest/tests/client/consumer_test.py | 33 +++++++++++++++++++++------
 1 file changed, 26 insertions(+), 7 deletions(-)

diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index 60c8daa37e9..d8531118df0 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -348,26 +348,45 @@ class OffsetValidationTest(VerifiableConsumerTest):
             consumer.start()
             self.await_members(consumer, len(consumer.nodes))
 
+            num_rebalances = consumer.num_rebalances()
             conflict_consumer.start()
-            self.await_members(conflict_consumer, num_conflict_consumers)
-            self.await_members(consumer, len(consumer.nodes) - 
num_conflict_consumers)
+            if group_protocol == consumer_group.classic_group_protocol:
+                # Classic protocol: conflicting members should join, and the 
intial ones with conflicting instance id should fail.
+                self.await_members(conflict_consumer, num_conflict_consumers)
+                self.await_members(consumer, len(consumer.nodes) - 
num_conflict_consumers)
 
-            wait_until(lambda: len(consumer.dead_nodes()) == 
num_conflict_consumers,
+                wait_until(lambda: len(consumer.dead_nodes()) == 
num_conflict_consumers,
                        timeout_sec=10,
                        err_msg="Timed out waiting for the fenced consumers to 
stop")
+            else:
+                # Consumer protocol: Existing members should remain active and 
new conflicting ones should not be able to join.
+                self.await_consumed_messages(consumer)
+                assert num_rebalances == consumer.num_rebalances(), "Static 
consumers attempt to join with instance id in use should not cause a rebalance"
+                assert len(consumer.joined_nodes()) == len(consumer.nodes)
+                assert len(conflict_consumer.joined_nodes()) == 0
+                
+                # Stop existing nodes, so conflicting ones should be able to 
join.
+                consumer.stop_all()
+                wait_until(lambda: len(consumer.dead_nodes()) == 
len(consumer.nodes),
+                           timeout_sec=self.session_timeout_sec+5,
+                           err_msg="Timed out waiting for the consumer to 
shutdown")
+                conflict_consumer.start()
+                self.await_members(conflict_consumer, num_conflict_consumers)
+
+            
         else:
             consumer.start()
             conflict_consumer.start()
 
             wait_until(lambda: len(consumer.joined_nodes()) + 
len(conflict_consumer.joined_nodes()) == len(consumer.nodes),
-                       timeout_sec=self.session_timeout_sec,
-                       err_msg="Timed out waiting for consumers to join, 
expected total %d joined, but only see %d joined from"
+                       timeout_sec=self.session_timeout_sec*2,
+                       err_msg="Timed out waiting for consumers to join, 
expected total %d joined, but only see %d joined from "
                                "normal consumer group and %d from conflict 
consumer group" % \
                                (len(consumer.nodes), 
len(consumer.joined_nodes()), len(conflict_consumer.joined_nodes()))
                        )
             wait_until(lambda: len(consumer.dead_nodes()) + 
len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes),
-                       timeout_sec=self.session_timeout_sec,
-                       err_msg="Timed out waiting for fenced consumers to die, 
expected total %d dead, but only see %d dead in"
+                       timeout_sec=self.session_timeout_sec*2,
+                       err_msg="Timed out waiting for fenced consumers to die, 
expected total %d dead, but only see %d dead in "
                                "normal consumer group and %d dead in conflict 
consumer group" % \
                                (len(conflict_consumer.nodes), 
len(consumer.dead_nodes()), len(conflict_consumer.dead_nodes()))
                        )

Reply via email to