This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4bf65a758d6 MINOR: fixed fail_many_brokers behavior if failure mode is 
not clean_shutdown. (#20710)
4bf65a758d6 is described below

commit 4bf65a758d6798c154861e8cddb1b984acd74baf
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Mon Oct 20 16:48:58 2025 -0700

    MINOR: fixed fail_many_brokers behavior if failure mode is not 
clean_shutdown. (#20710)
    
    Fixes fail_many_brokers behavior if failure mode is not clean_shutdown.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../tests/streams/streams_broker_bounce_test.py    | 52 +++++++++++++++-------
 1 file changed, 37 insertions(+), 15 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py 
b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index 3d7e40fdf0d..7b0ea9becd3 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -63,24 +63,53 @@ def hard_bounce(test, topic, broker_type):
         prev_broker_node = broker_node(test, topic, broker_type)
         test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
 
-        # Since this is a hard kill, we need to make sure the process is down 
and that
-        # zookeeper has registered the loss by expiring the broker's session 
timeout.
-
-        wait_until(lambda: not test.kafka.pids(prev_broker_node) and
-                           not (quorum.for_test(test.test_context) == 
quorum.zk and test.kafka.is_registered(prev_broker_node)),
+        wait_until(lambda: not test.kafka.pids(prev_broker_node),
                    timeout_sec=test.kafka.zk_session_timeout + 5,
                    err_msg="Failed to see timely deregistration of hard-killed 
broker %s" % str(prev_broker_node.account))
 
         test.kafka.start_node(prev_broker_node)
-    
 
-    
+
+def bulk_clean_shutdown(test, num_failures):
+    for num in range(0, num_failures - 1):
+        signal_node(test, test.kafka.nodes[num], signal.SIGTERM)
+
+def bulk_hard_shutdown(test, num_failures):
+    for num in range(0, num_failures - 1):
+        signal_node(test, test.kafka.nodes[num], signal.SIGKILL)
+
+def bulk_clean_bounce(test, num_failures):
+    for i in range(5):
+        for num in range(0, num_failures - 1):
+            prev_broker_node = test.kafka.nodes[num]
+            test.kafka.restart_node(prev_broker_node, clean_shutdown=True)
+
+def bulk_hard_bounce(test, num_failures):
+    for i in range(5):
+        for num in range(0, num_failures - 1):
+            prev_broker_node = test.kafka.nodes[num]
+            test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
+
+            wait_until(lambda: not test.kafka.pids(prev_broker_node),
+                       timeout_sec=test.kafka.zk_session_timeout + 5,
+                       err_msg="Failed to see timely deregistration of 
hard-killed broker %s" % str(prev_broker_node.account))
+
+            test.kafka.start_node(prev_broker_node)
+
+
 failures = {
     "clean_shutdown": clean_shutdown,
     "hard_shutdown": hard_shutdown,
     "clean_bounce": clean_bounce,
     "hard_bounce": hard_bounce
 }
+
+many_failures = {
+    "clean_shutdown": bulk_clean_shutdown,
+    "hard_shutdown": bulk_hard_shutdown,
+    "clean_bounce": bulk_clean_bounce,
+    "hard_bounce": bulk_hard_bounce
+}
         
 class StreamsBrokerBounceTest(Test):
     """
@@ -123,14 +152,7 @@ class StreamsBrokerBounceTest(Test):
         failures[failure_mode](self, topic, broker_type)
 
     def fail_many_brokers(self, failure_mode, num_failures):
-        sig = signal.SIGTERM
-        if (failure_mode == "clean_shutdown"):
-            sig = signal.SIGTERM
-        else:
-            sig = signal.SIGKILL
-            
-        for num in range(0, num_failures - 1):
-            signal_node(self, self.kafka.nodes[num], sig)
+        many_failures[failure_mode](self, num_failures)
 
     def confirm_topics_on_all_brokers(self, expected_topic_set):
         for node in self.kafka.nodes:

Reply via email to