This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 75768dd3383 MINOR: Prevent re-join flakiness in 
test_fencing_static_consumer by ensuring conflicting static consumers terminate 
(#20772)
75768dd3383 is described below

commit 75768dd3383aaa98edd243351718c987808f7dab
Author: Hong-Yi Chen <[email protected]>
AuthorDate: Fri Oct 31 14:52:49 2025 +0800

    MINOR: Prevent re-join flakiness in test_fencing_static_consumer by 
ensuring conflicting static consumers terminate (#20772)
    
    Related discussion:
    https://github.com/apache/kafka/pull/20594#pullrequestreview-3362587453
    
    ### Problem
    The test `OffsetValidationTest.test_fencing_static_consumer` failed when
    executed with
    `fencing_stage=stable` and `group_protocol=consumer`.
    It timed out while waiting for the group to become empty because the
    conflicting static consumers re-joined after the original members
    stopped, keeping the group non-empty and causing the timeout.
    
    ### Fix
    For the consumer-protocol path, the test now waits for all conflicting
    consumer processes to terminate before stopping the original static
    members. This ensures that each conflicting consumers is fully fenced
    and cannot re-join the group after the original members stop.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 tests/kafkatest/services/verifiable_consumer.py | 14 ++++++++++++--
 tests/kafkatest/tests/client/consumer_test.py   | 15 ++++++++++++++-
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git a/tests/kafkatest/services/verifiable_consumer.py 
b/tests/kafkatest/services/verifiable_consumer.py
index 9a8a294a2aa..6e9716761f6 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -42,7 +42,8 @@ class ConsumerEventHandler(object):
 
     def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead,
                  revoked_count=0, assigned_count=0, assignment=None,
-                 position=None, committed=None, total_consumed=0):
+                 position=None, committed=None, total_consumed=0,
+                 shutdown_complete=False):
         self.node = node
         self.verify_offsets = verify_offsets
         self.idx = idx
@@ -53,11 +54,13 @@ class ConsumerEventHandler(object):
         self.position = position if position is not None else {}
         self.committed = committed if committed is not None else {}
         self.total_consumed = total_consumed
+        self.shutdown_complete = shutdown_complete
 
     def handle_shutdown_complete(self, node=None, logger=None):
         self.state = ConsumerState.Dead
         self.assignment = []
         self.position = {}
+        self.shutdown_complete = True 
 
         if node is not None and logger is not None:
             logger.debug("Shut down %s" % node.account.hostname)
@@ -277,7 +280,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
                                      assignment=existing_handler.assignment,
                                      position=existing_handler.position,
                                      committed=existing_handler.committed,
-                                     
total_consumed=existing_handler.total_consumed)
+                                     
total_consumed=existing_handler.total_consumed,
+                                     
shutdown_complete=existing_handler.shutdown_complete)
             else:
                 return handler_class(node, self.verify_offsets, idx)
         existing_handler = self.event_handlers[node] if node in 
self.event_handlers else None
@@ -292,6 +296,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         with self.lock:
             self.event_handlers[node] = self.create_event_handler(idx, node)
             handler = self.event_handlers[node]
+            handler.shutdown_complete = False
 
         node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, 
allow_fail=False)
 
@@ -526,5 +531,10 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
             return [handler.node for handler in self.event_handlers.values()
                     if handler.state != ConsumerState.Dead]
 
+    def shutdown_complete_nodes(self):
+        with self.lock:
+            return [handler.node for handler in self.event_handlers.values()
+                    if handler.shutdown_complete]
+
     def is_consumer_group_protocol_enabled(self):
         return self.group_protocol and self.group_protocol.lower() == 
consumer_group.consumer_group_protocol
diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index cb964e4c303..7a8614a1cd9 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -18,6 +18,7 @@ from ducktape.utils.util import wait_until
 from ducktape.mark.resource import cluster
 
 from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
 from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
 
 import signal
@@ -74,6 +75,14 @@ class OffsetValidationTest(VerifiableConsumerTest):
         self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
         return consumer
 
+    def await_conflict_consumers_fenced(self, conflict_consumer):
+        # Rely on explicit shutdown_complete events from the verifiable 
consumer to guarantee each conflict member
+        # reached the fenced path rather than remaining in the default DEAD 
state prior to startup.
+        wait_until(lambda: len(conflict_consumer.shutdown_complete_nodes()) == 
len(conflict_consumer.nodes) and 
+                           len(conflict_consumer.dead_nodes()) == 
len(conflict_consumer.nodes),
+                   timeout_sec=60,
+                   err_msg="Timed out waiting for conflict consumers to report 
shutdown completion after fencing")
+        
     @cluster(num_nodes=7)
     @matrix(
         metadata_quorum=[quorum.isolated_kraft],
@@ -326,7 +335,11 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 assert num_rebalances == consumer.num_rebalances(), "Static 
consumers attempt to join with instance id in use should not cause a rebalance"
                 assert len(consumer.joined_nodes()) == len(consumer.nodes)
                 assert len(conflict_consumer.joined_nodes()) == 0
-                
+
+                # Conflict consumers will terminate due to a fatal 
UnreleasedInstanceIdException error.
+                # Wait for termination to complete to prevent conflict 
consumers from immediately re-joining the group while existing nodes are 
shutting down.
+                self.await_conflict_consumers_fenced(conflict_consumer)
+
                 # Stop existing nodes, so conflicting ones should be able to 
join.
                 consumer.stop_all()
                 wait_until(lambda: len(consumer.dead_nodes()) == 
len(consumer.nodes),

Reply via email to