This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4bf65a758d6 MINOR: fixed fail_many_brokers behavior if failure mode is
not clean_shutdown. (#20710)
4bf65a758d6 is described below
commit 4bf65a758d6798c154861e8cddb1b984acd74baf
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Mon Oct 20 16:48:58 2025 -0700
MINOR: fixed fail_many_brokers behavior if failure mode is not
clean_shutdown. (#20710)
Fixes fail_many_brokers behavior if failure mode is not clean_shutdown.
Reviewers: Matthias J. Sax <[email protected]>
---
.../tests/streams/streams_broker_bounce_test.py | 52 +++++++++++++++-------
1 file changed, 37 insertions(+), 15 deletions(-)
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index 3d7e40fdf0d..7b0ea9becd3 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -63,24 +63,53 @@ def hard_bounce(test, topic, broker_type):
prev_broker_node = broker_node(test, topic, broker_type)
test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
- # Since this is a hard kill, we need to make sure the process is down
and that
- # zookeeper has registered the loss by expiring the broker's session
timeout.
-
- wait_until(lambda: not test.kafka.pids(prev_broker_node) and
- not (quorum.for_test(test.test_context) ==
quorum.zk and test.kafka.is_registered(prev_broker_node)),
+ wait_until(lambda: not test.kafka.pids(prev_broker_node),
timeout_sec=test.kafka.zk_session_timeout + 5,
err_msg="Failed to see timely deregistration of hard-killed
broker %s" % str(prev_broker_node.account))
test.kafka.start_node(prev_broker_node)
-
-
+
+def bulk_clean_shutdown(test, num_failures):
+ for num in range(0, num_failures - 1):
+ signal_node(test, test.kafka.nodes[num], signal.SIGTERM)
+
+def bulk_hard_shutdown(test, num_failures):
+ for num in range(0, num_failures - 1):
+ signal_node(test, test.kafka.nodes[num], signal.SIGKILL)
+
+def bulk_clean_bounce(test, num_failures):
+ for i in range(5):
+ for num in range(0, num_failures - 1):
+ prev_broker_node = test.kafka.nodes[num]
+ test.kafka.restart_node(prev_broker_node, clean_shutdown=True)
+
+def bulk_hard_bounce(test, num_failures):
+ for i in range(5):
+ for num in range(0, num_failures - 1):
+ prev_broker_node = test.kafka.nodes[num]
+ test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
+
+ wait_until(lambda: not test.kafka.pids(prev_broker_node),
+ timeout_sec=test.kafka.zk_session_timeout + 5,
+ err_msg="Failed to see timely deregistration of
hard-killed broker %s" % str(prev_broker_node.account))
+
+ test.kafka.start_node(prev_broker_node)
+
+
failures = {
"clean_shutdown": clean_shutdown,
"hard_shutdown": hard_shutdown,
"clean_bounce": clean_bounce,
"hard_bounce": hard_bounce
}
+
+many_failures = {
+ "clean_shutdown": bulk_clean_shutdown,
+ "hard_shutdown": bulk_hard_shutdown,
+ "clean_bounce": bulk_clean_bounce,
+ "hard_bounce": bulk_hard_bounce
+}
class StreamsBrokerBounceTest(Test):
"""
@@ -123,14 +152,7 @@ class StreamsBrokerBounceTest(Test):
failures[failure_mode](self, topic, broker_type)
def fail_many_brokers(self, failure_mode, num_failures):
- sig = signal.SIGTERM
- if (failure_mode == "clean_shutdown"):
- sig = signal.SIGTERM
- else:
- sig = signal.SIGKILL
-
- for num in range(0, num_failures - 1):
- signal_node(self, self.kafka.nodes[num], sig)
+ many_failures[failure_mode](self, num_failures)
def confirm_topics_on_all_brokers(self, expected_topic_set):
for node in self.kafka.nodes: