This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4377ee08081 KAFKA-19421: Deflake streams_broker_down_resilience_test
(#19999)
4377ee08081 is described below
commit 4377ee08081764f56e0521f2bd43f1ca9a484aa4
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Jun 23 10:30:13 2025 +0200
KAFKA-19421: Deflake streams_broker_down_resilience_test (#19999)
`streams_broker_down_resilience_test` produce messages with `null` key
to a topic with three partitions and expect each partition to be
non-empty afterward. But I don't think this is a correct assumption, as
a producer may try to be sticky and only produce to two partitions.
This cause occasional flakiness in the test.
The fix is to produce records with keys.
Reviewers: Matthias J. Sax <[email protected]>, PoAn Yang
<[email protected]>
---
tests/kafkatest/tests/streams/base_streams_test.py | 4 ++--
.../tests/streams/streams_broker_down_resilience_test.py | 16 ++++++++++++----
2 files changed, 14 insertions(+), 6 deletions(-)
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py
b/tests/kafkatest/tests/streams/base_streams_test.py
index 81cad7a4d1b..da00e0895f2 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -82,8 +82,8 @@ class BaseStreamsTest(Test):
self.assert_consume(client_id, test_state, streams_sink_topic,
num_messages, timeout_sec)
- def assert_produce(self, topic, test_state, num_messages=5,
timeout_sec=60):
- producer = self.get_producer(topic, num_messages)
+ def assert_produce(self, topic, test_state, num_messages=5,
timeout_sec=60, repeating_keys=None):
+ producer = self.get_producer(topic, num_messages,
repeating_keys=repeating_keys)
producer.start()
wait_until(lambda: producer.num_acked >= num_messages,
diff --git
a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 3b9d0b43bf7..94df6e37473 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -129,10 +129,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE)
as monitor_2:
with
processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
+ # repeating_keys enables production of records with keys,
ensuring that we produce to all 3 partitions
self.assert_produce(self.inputTopic,
"sending_message_after_broker_down_initially",
num_messages=self.num_messages,
- timeout_sec=120)
+ timeout_sec=120,
+ repeating_keys=self.num_messages)
monitor_1.wait_until(self.message,
timeout_sec=120,
@@ -189,10 +191,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE)
as monitor_2:
with
processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
+ # repeating_keys enables production of records with keys,
ensuring that we produce to all 3 partitions
self.assert_produce(self.inputTopic,
"sending_message_normal_broker_start",
num_messages=self.num_messages,
- timeout_sec=120)
+ timeout_sec=120,
+ repeating_keys=self.num_messages)
monitor_1.wait_until(self.message,
timeout_sec=120,
@@ -273,10 +277,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE)
as monitor_2:
with
processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
+ # repeating_keys enables production of records with keys,
ensuring that we produce to all 3 partitions
self.assert_produce(self.inputTopic,
"sending_message_after_normal_broker_start",
num_messages=self.num_messages,
- timeout_sec=120)
+ timeout_sec=120,
+ repeating_keys=self.num_messages)
monitor_1.wait_until(self.message,
timeout_sec=120,
@@ -320,10 +326,12 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE)
as monitor_2:
with
processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3:
+ # repeating_keys enables production of records with keys,
ensuring that we produce to all 3 partitions
self.assert_produce(self.inputTopic,
"sending_message_after_hard_bouncing_streams_instance_bouncing_broker",
num_messages=self.num_messages,
- timeout_sec=120)
+ timeout_sec=120,
+ repeating_keys=self.num_messages)
monitor_1.wait_until(self.message,
timeout_sec=120,