This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new e1d831396d8 MINOR: cherry-pick from trunk fix for flaky fence static 
consumer system tests (#21034)
e1d831396d8 is described below

commit e1d831396d86fce76c8c3f1c38bf330f95643a8c
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Dec 1 16:27:09 2025 -0500

    MINOR: cherry-pick from trunk fix for flaky fence static consumer system 
tests (#21034)
    
    cherry-picked from trunk:
    ```
    MINOR: Prevent re-join flakiness in test_fencing_static_consumer by
    ensuring conflicting static consumers terminate (#20772)
    
    Related discussion:
    https://github.com/apache/kafka/pull/20594#pullrequestreview-3362587453
    
    ### Problem
    The test `OffsetValidationTest.test_fencing_static_consumer` failed when
    executed with
    `fencing_stage=stable` and `group_protocol=consumer`. It timed out while
    waiting for the group to become empty because the conflicting static
    consumers re-joined after the original members stopped, keeping the
    group non-empty and causing the timeout.
    
    ### Fix
    For the consumer-protocol path, the test now waits for all conflicting
    consumer processes to terminate before stopping the original static
    members. This ensures that each conflicting consumers is fully fenced
    and cannot re-join the group after the original members stop.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
    ```
    
    Reviewers: Lianet Magrans<[email protected]>
    
    Co-authored-by: Hong-Yi Chen <[email protected]>
---
 tests/kafkatest/services/verifiable_consumer.py | 14 ++++++++++++--
 tests/kafkatest/tests/client/consumer_test.py   | 13 +++++++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/tests/kafkatest/services/verifiable_consumer.py 
b/tests/kafkatest/services/verifiable_consumer.py
index 8264566f1c2..2e19bc6757a 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -42,7 +42,8 @@ class ConsumerEventHandler(object):
 
     def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead,
                  revoked_count=0, assigned_count=0, assignment=None,
-                 position=None, committed=None, total_consumed=0):
+                 position=None, committed=None, total_consumed=0,
+                 shutdown_complete=False):
         self.node = node
         self.verify_offsets = verify_offsets
         self.idx = idx
@@ -53,11 +54,13 @@ class ConsumerEventHandler(object):
         self.position = position if position is not None else {}
         self.committed = committed if committed is not None else {}
         self.total_consumed = total_consumed
+        self.shutdown_complete = shutdown_complete
 
     def handle_shutdown_complete(self, node=None, logger=None):
         self.state = ConsumerState.Dead
         self.assignment = []
         self.position = {}
+        self.shutdown_complete = True 
 
         if node is not None and logger is not None:
             logger.debug("Shut down %s" % node.account.hostname)
@@ -277,7 +280,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
                                      assignment=existing_handler.assignment,
                                      position=existing_handler.position,
                                      committed=existing_handler.committed,
-                                     
total_consumed=existing_handler.total_consumed)
+                                     
total_consumed=existing_handler.total_consumed,
+                                     
shutdown_complete=existing_handler.shutdown_complete)
             else:
                 return handler_class(node, self.verify_offsets, idx)
         existing_handler = self.event_handlers[node] if node in 
self.event_handlers else None
@@ -292,6 +296,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         with self.lock:
             self.event_handlers[node] = self.create_event_handler(idx, node)
             handler = self.event_handlers[node]
+            handler.shutdown_complete = False
 
         node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, 
allow_fail=False)
 
@@ -521,5 +526,10 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
             return [handler.node for handler in self.event_handlers.values()
                     if handler.state != ConsumerState.Dead]
 
+    def shutdown_complete_nodes(self):
+        with self.lock:
+            return [handler.node for handler in self.event_handlers.values()
+                    if handler.shutdown_complete]
+
     def is_consumer_group_protocol_enabled(self):
         return self.group_protocol and self.group_protocol.lower() == 
consumer_group.consumer_group_protocol
diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index 4edaa498bfb..85a0282e291 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -18,6 +18,7 @@ from ducktape.utils.util import wait_until
 from ducktape.mark.resource import cluster
 
 from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
 from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
 
 import signal
@@ -74,6 +75,14 @@ class OffsetValidationTest(VerifiableConsumerTest):
         self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
         return consumer
 
+    def await_conflict_consumers_fenced(self, conflict_consumer):
+        # Rely on explicit shutdown_complete events from the verifiable 
consumer to guarantee each conflict member
+        # reached the fenced path rather than remaining in the default DEAD 
state prior to startup.
+        wait_until(lambda: len(conflict_consumer.shutdown_complete_nodes()) == 
len(conflict_consumer.nodes) and
+                           len(conflict_consumer.dead_nodes()) == 
len(conflict_consumer.nodes),
+                   timeout_sec=60,
+                   err_msg="Timed out waiting for conflict consumers to report 
shutdown completion after fencing")
+
     @cluster(num_nodes=7)
     @matrix(
         metadata_quorum=[quorum.isolated_kraft],
@@ -332,6 +341,10 @@ class OffsetValidationTest(VerifiableConsumerTest):
                     raise
                 assert len(conflict_consumer.joined_nodes()) == 0
 
+                # Conflict consumers will terminate due to a fatal 
UnreleasedInstanceIdException error.
+                # Wait for termination to complete to prevent conflict 
consumers from immediately re-joining the group while existing nodes are 
shutting down.
+                self.await_conflict_consumers_fenced(conflict_consumer)
+
                 # Stop existing nodes, so conflicting ones should be able to 
join.
                 consumer.stop_all()
                 wait_until(lambda: len(consumer.dead_nodes()) == 
len(consumer.nodes),

Reply via email to