This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9ea0503e234 KAFKA-16566: Fix consumer static membership system test with new protocol (#15738) 9ea0503e234 is described below commit 9ea0503e234368d25748a2d7677bd5a1f87bc058 Author: Lianet Magrans <98415067+lian...@users.noreply.github.com> AuthorDate: Fri Apr 19 11:34:05 2024 +0200 KAFKA-16566: Fix consumer static membership system test with new protocol (#15738) Updating consumer system test that was failing with the new protocol, related to static membership behaviour. The behaviour regarding static consumers that join with conflicting group instance id is slightly different between the classic and new consumer protocol, so the expectations in the tests needed to be updated. If static members join with same instance id: Classic protocol: all members join the group with the same group instance id, and then the first one will eventually fail (receives a HB error with FencedInstanceIdException) Consumer protocol: new member with an instance Id already in use is not able to join, and first member remains active (new member with same instance Id receives an UnreleasedInstanceIdException in the response to the HB to join the group) This PR is keeping the single parametrized test that existed before, given that what's being tested and part of the test itself apply to all protocols. This is just updating the expectations that are different, based on the protocol parameter. Reviewers: Lucas Brutschy <lbruts...@confluent.io>, Kirk True <kt...@confluent.io> --- tests/kafkatest/tests/client/consumer_test.py | 33 +++++++++++++++++++++------ 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py index 60c8daa37e9..d8531118df0 100644 --- a/tests/kafkatest/tests/client/consumer_test.py +++ b/tests/kafkatest/tests/client/consumer_test.py @@ -348,26 +348,45 @@ class OffsetValidationTest(VerifiableConsumerTest): consumer.start() self.await_members(consumer, len(consumer.nodes)) + num_rebalances = consumer.num_rebalances() conflict_consumer.start() - self.await_members(conflict_consumer, num_conflict_consumers) - self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) + if group_protocol == consumer_group.classic_group_protocol: + # Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail. + self.await_members(conflict_consumer, num_conflict_consumers) + self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) - wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, + wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, timeout_sec=10, err_msg="Timed out waiting for the fenced consumers to stop") + else: + # Consumer protocol: Existing members should remain active and new conflicting ones should not be able to join. + self.await_consumed_messages(consumer) + assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance" + assert len(consumer.joined_nodes()) == len(consumer.nodes) + assert len(conflict_consumer.joined_nodes()) == 0 + + # Stop existing nodes, so conflicting ones should be able to join. + consumer.stop_all() + wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes), + timeout_sec=self.session_timeout_sec+5, + err_msg="Timed out waiting for the consumer to shutdown") + conflict_consumer.start() + self.await_members(conflict_consumer, num_conflict_consumers) + + else: consumer.start() conflict_consumer.start() wait_until(lambda: len(consumer.joined_nodes()) + len(conflict_consumer.joined_nodes()) == len(consumer.nodes), - timeout_sec=self.session_timeout_sec, - err_msg="Timed out waiting for consumers to join, expected total %d joined, but only see %d joined from" + timeout_sec=self.session_timeout_sec*2, + err_msg="Timed out waiting for consumers to join, expected total %d joined, but only see %d joined from " "normal consumer group and %d from conflict consumer group" % \ (len(consumer.nodes), len(consumer.joined_nodes()), len(conflict_consumer.joined_nodes())) ) wait_until(lambda: len(consumer.dead_nodes()) + len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes), - timeout_sec=self.session_timeout_sec, - err_msg="Timed out waiting for fenced consumers to die, expected total %d dead, but only see %d dead in" + timeout_sec=self.session_timeout_sec*2, + err_msg="Timed out waiting for fenced consumers to die, expected total %d dead, but only see %d dead in " "normal consumer group and %d dead in conflict consumer group" % \ (len(conflict_consumer.nodes), len(consumer.dead_nodes()), len(conflict_consumer.dead_nodes())) )