This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 261e8613404 KAFKA-19422: Deflake streams_application_upgrade_test
(#20004)
261e8613404 is described below
commit 261e8613404096f062f7284ccca0e2ab849758cd
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Jun 23 17:41:05 2025 +0200
KAFKA-19422: Deflake streams_application_upgrade_test (#20004)
In this upgrade test, applications sometimes crash before the upgrade,
so it's actually triggering a bug in several older versions (2.x and
possibly others). It seems to be a rare race condition that has been
happening since 2022. Since we are not going to roll out a patch release
for Kafka Streams 2.x, we should just allow applications to crash before
the upgrade.
Reviewers: Matthias J. Sax <[email protected]>
---
.../streams/streams_application_upgrade_test.py | 41 ++++++++++++++--------
1 file changed, 26 insertions(+), 15 deletions(-)
diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
index 78ea4141496..6831ec896e8 100644
--- a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
@@ -108,10 +108,10 @@ class StreamsUpgradeTest(Test):
random.shuffle(self.processors)
for p in self.processors:
p.CLEAN_NODE_ENABLED = False
- self.do_stop_start_bounce(p, None, to_version, counter)
+ self.do_stop_start_bounce(p, None, from_version, to_version,
counter)
counter = counter + 1
elif bounce_type == "full":
- self.restart_all_nodes_with(to_version)
+ self.restart_all_nodes_with(from_version, to_version)
else:
raise Exception("Unrecognized bounce_type: " + str(bounce_type))
@@ -157,30 +157,36 @@ class StreamsUpgradeTest(Test):
self.wait_for_verification(self.processor2, self.processed_msg,
self.processor2.STDOUT_FILE)
self.wait_for_verification(self.processor3, self.processed_msg,
self.processor3.STDOUT_FILE)
- def restart_all_nodes_with(self, version):
+ def restart_all_nodes_with(self, from_version, to_version):
self.processor1.stop_node(self.processor1.node)
self.processor2.stop_node(self.processor2.node)
self.processor3.stop_node(self.processor3.node)
# make sure the members have stopped
- self.wait_for_verification(self.processor1,
"SMOKE-TEST-CLIENT-CLOSED", self.processor1.STDOUT_FILE)
- self.wait_for_verification(self.processor2,
"SMOKE-TEST-CLIENT-CLOSED", self.processor2.STDOUT_FILE)
- self.wait_for_verification(self.processor3,
"SMOKE-TEST-CLIENT-CLOSED", self.processor3.STDOUT_FILE)
+ if from_version.startswith("2."):
+ # some older versions crash on shutdown, so we allow crashes here.
+ self.wait_for_verification(self.processor1,
"SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)", self.processor1.STDOUT_FILE)
+ self.wait_for_verification(self.processor2,
"SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)", self.processor2.STDOUT_FILE)
+ self.wait_for_verification(self.processor3,
"SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)", self.processor3.STDOUT_FILE)
+ else:
+ self.wait_for_verification(self.processor1,
"SMOKE-TEST-CLIENT-CLOSED", self.processor1.STDOUT_FILE)
+ self.wait_for_verification(self.processor2,
"SMOKE-TEST-CLIENT-CLOSED", self.processor2.STDOUT_FILE)
+ self.wait_for_verification(self.processor3,
"SMOKE-TEST-CLIENT-CLOSED", self.processor3.STDOUT_FILE)
self.roll_logs(self.processor1, ".1-1")
self.roll_logs(self.processor2, ".1-1")
self.roll_logs(self.processor3, ".1-1")
- self.set_version(self.processor1, version)
- self.set_version(self.processor2, version)
- self.set_version(self.processor3, version)
+ self.set_version(self.processor1, to_version)
+ self.set_version(self.processor2, to_version)
+ self.set_version(self.processor3, to_version)
self.processor1.start_node(self.processor1.node)
self.processor2.start_node(self.processor2.node)
self.processor3.start_node(self.processor3.node)
# double-check the version
- kafka_version_str = self.get_version_string(version)
+ kafka_version_str = self.get_version_string(to_version)
self.wait_for_verification(self.processor1, kafka_version_str,
self.processor1.LOG_FILE)
self.wait_for_verification(self.processor2, kafka_version_str,
self.processor2.LOG_FILE)
self.wait_for_verification(self.processor3, kafka_version_str,
self.processor3.LOG_FILE)
@@ -226,8 +232,8 @@ class StreamsUpgradeTest(Test):
def purge_state_dir(self, processor):
processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT,
allow_fail=False)
- def do_stop_start_bounce(self, processor, upgrade_from, new_version,
counter):
- kafka_version_str = self.get_version_string(new_version)
+ def do_stop_start_bounce(self, processor, upgrade_from, from_version,
to_version, counter):
+ kafka_version_str = self.get_version_string(to_version)
first_other_processor = None
second_other_processor = None
@@ -252,7 +258,12 @@ class StreamsUpgradeTest(Test):
second_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s'
on " % self.processed_msg + str(second_other_node.account))
- node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" %
processor.STDOUT_FILE, allow_fail=False)
+
+ if from_version.startswith("2."):
+ # some older versions crash on shutdown, so we allow crashes here.
+ node.account.ssh_capture("grep -E
'SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)' %s" % processor.STDOUT_FILE,
allow_fail=False)
+ else:
+ node.account.ssh_capture("grep -E 'SMOKE-TEST-CLIENT-CLOSED' %s" %
processor.STDOUT_FILE, allow_fail=False)
if upgrade_from is None: # upgrade disabled -- second round of
rolling bounces
roll_counter = ".1-" # second round of rolling bounces
@@ -261,7 +272,7 @@ class StreamsUpgradeTest(Test):
self.roll_logs(processor, roll_counter + str(counter))
- self.set_version(processor, new_version)
+ self.set_version(processor, to_version)
processor.set_upgrade_from(upgrade_from)
grep_metadata_error = "grep
\"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
subscription data: version=2\" "
@@ -273,7 +284,7 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
- err_msg="Could not detect Kafka
Streams version " + new_version + " on " + str(node.account))
+ err_msg="Could not detect Kafka
Streams version " + to_version + " on " + str(node.account))
first_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw
output '%s' on " % self.processed_msg + str(first_other_node.account))