This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch 4.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 5f1623adec3d6ab5f5b0c15295c5d191da1f62ab Author: Lucas Brutschy <lbruts...@confluent.io> AuthorDate: Mon Jun 23 17:41:05 2025 +0200 KAFKA-19422: Deflake streams_application_upgrade_test (#20004) In this upgrade test, applications sometimes crash before the upgrade, so it's actually triggering a bug in several older versions (2.x and possibly others). It seems to be a rare race condition that has been happening since 2022. Since we are not going to roll out a patch release for Kafka Streams 2.x, we should just allow applications to crash before the upgrade. Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../streams/streams_application_upgrade_test.py | 41 ++++++++++++++-------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py index 78ea4141496..6831ec896e8 100644 --- a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py @@ -108,10 +108,10 @@ class StreamsUpgradeTest(Test): random.shuffle(self.processors) for p in self.processors: p.CLEAN_NODE_ENABLED = False - self.do_stop_start_bounce(p, None, to_version, counter) + self.do_stop_start_bounce(p, None, from_version, to_version, counter) counter = counter + 1 elif bounce_type == "full": - self.restart_all_nodes_with(to_version) + self.restart_all_nodes_with(from_version, to_version) else: raise Exception("Unrecognized bounce_type: " + str(bounce_type)) @@ -157,30 +157,36 @@ class StreamsUpgradeTest(Test): self.wait_for_verification(self.processor2, self.processed_msg, self.processor2.STDOUT_FILE) self.wait_for_verification(self.processor3, self.processed_msg, self.processor3.STDOUT_FILE) - def restart_all_nodes_with(self, version): + def restart_all_nodes_with(self, from_version, to_version): self.processor1.stop_node(self.processor1.node) self.processor2.stop_node(self.processor2.node) self.processor3.stop_node(self.processor3.node) # make sure the members have stopped - self.wait_for_verification(self.processor1, "SMOKE-TEST-CLIENT-CLOSED", self.processor1.STDOUT_FILE) - self.wait_for_verification(self.processor2, "SMOKE-TEST-CLIENT-CLOSED", self.processor2.STDOUT_FILE) - self.wait_for_verification(self.processor3, "SMOKE-TEST-CLIENT-CLOSED", self.processor3.STDOUT_FILE) + if from_version.startswith("2."): + # some older versions crash on shutdown, so we allow crashes here. + self.wait_for_verification(self.processor1, "SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)", self.processor1.STDOUT_FILE) + self.wait_for_verification(self.processor2, "SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)", self.processor2.STDOUT_FILE) + self.wait_for_verification(self.processor3, "SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)", self.processor3.STDOUT_FILE) + else: + self.wait_for_verification(self.processor1, "SMOKE-TEST-CLIENT-CLOSED", self.processor1.STDOUT_FILE) + self.wait_for_verification(self.processor2, "SMOKE-TEST-CLIENT-CLOSED", self.processor2.STDOUT_FILE) + self.wait_for_verification(self.processor3, "SMOKE-TEST-CLIENT-CLOSED", self.processor3.STDOUT_FILE) self.roll_logs(self.processor1, ".1-1") self.roll_logs(self.processor2, ".1-1") self.roll_logs(self.processor3, ".1-1") - self.set_version(self.processor1, version) - self.set_version(self.processor2, version) - self.set_version(self.processor3, version) + self.set_version(self.processor1, to_version) + self.set_version(self.processor2, to_version) + self.set_version(self.processor3, to_version) self.processor1.start_node(self.processor1.node) self.processor2.start_node(self.processor2.node) self.processor3.start_node(self.processor3.node) # double-check the version - kafka_version_str = self.get_version_string(version) + kafka_version_str = self.get_version_string(to_version) self.wait_for_verification(self.processor1, kafka_version_str, self.processor1.LOG_FILE) self.wait_for_verification(self.processor2, kafka_version_str, self.processor2.LOG_FILE) self.wait_for_verification(self.processor3, kafka_version_str, self.processor3.LOG_FILE) @@ -226,8 +232,8 @@ class StreamsUpgradeTest(Test): def purge_state_dir(self, processor): processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False) - def do_stop_start_bounce(self, processor, upgrade_from, new_version, counter): - kafka_version_str = self.get_version_string(new_version) + def do_stop_start_bounce(self, processor, upgrade_from, from_version, to_version, counter): + kafka_version_str = self.get_version_string(to_version) first_other_processor = None second_other_processor = None @@ -252,7 +258,12 @@ class StreamsUpgradeTest(Test): second_other_monitor.wait_until(self.processed_msg, timeout_sec=60, err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account)) - node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) + + if from_version.startswith("2."): + # some older versions crash on shutdown, so we allow crashes here. + node.account.ssh_capture("grep -E 'SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)' %s" % processor.STDOUT_FILE, allow_fail=False) + else: + node.account.ssh_capture("grep -E 'SMOKE-TEST-CLIENT-CLOSED' %s" % processor.STDOUT_FILE, allow_fail=False) if upgrade_from is None: # upgrade disabled -- second round of rolling bounces roll_counter = ".1-" # second round of rolling bounces @@ -261,7 +272,7 @@ class StreamsUpgradeTest(Test): self.roll_logs(processor, roll_counter + str(counter)) - self.set_version(processor, new_version) + self.set_version(processor, to_version) processor.set_upgrade_from(upgrade_from) grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" " @@ -273,7 +284,7 @@ class StreamsUpgradeTest(Test): log_monitor.wait_until(kafka_version_str, timeout_sec=60, - err_msg="Could not detect Kafka Streams version " + new_version + " on " + str(node.account)) + err_msg="Could not detect Kafka Streams version " + to_version + " on " + str(node.account)) first_other_monitor.wait_until(self.processed_msg, timeout_sec=60, err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))