This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 5f1623adec3d6ab5f5b0c15295c5d191da1f62ab
Author: Lucas Brutschy <lbruts...@confluent.io>
AuthorDate: Mon Jun 23 17:41:05 2025 +0200

    KAFKA-19422: Deflake streams_application_upgrade_test (#20004)
    
    In this upgrade test, applications sometimes crash before the upgrade,
    so it's actually triggering a bug in several older versions (2.x and
    possibly others). It seems to be a rare race condition that has been
    happening since 2022. Since we are not going to roll out a patch release
    for Kafka Streams 2.x, we should just allow applications to crash before
    the upgrade.
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../streams/streams_application_upgrade_test.py    | 41 ++++++++++++++--------
 1 file changed, 26 insertions(+), 15 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
index 78ea4141496..6831ec896e8 100644
--- a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
@@ -108,10 +108,10 @@ class StreamsUpgradeTest(Test):
             random.shuffle(self.processors)
             for p in self.processors:
                 p.CLEAN_NODE_ENABLED = False
-                self.do_stop_start_bounce(p, None, to_version, counter)
+                self.do_stop_start_bounce(p, None, from_version, to_version, 
counter)
                 counter = counter + 1
         elif bounce_type == "full":
-            self.restart_all_nodes_with(to_version)
+            self.restart_all_nodes_with(from_version, to_version)
         else:
             raise Exception("Unrecognized bounce_type: " + str(bounce_type))
 
@@ -157,30 +157,36 @@ class StreamsUpgradeTest(Test):
         self.wait_for_verification(self.processor2, self.processed_msg, 
self.processor2.STDOUT_FILE)
         self.wait_for_verification(self.processor3, self.processed_msg, 
self.processor3.STDOUT_FILE)
 
-    def restart_all_nodes_with(self, version):
+    def restart_all_nodes_with(self, from_version, to_version):
         self.processor1.stop_node(self.processor1.node)
         self.processor2.stop_node(self.processor2.node)
         self.processor3.stop_node(self.processor3.node)
 
         # make sure the members have stopped
-        self.wait_for_verification(self.processor1, 
"SMOKE-TEST-CLIENT-CLOSED", self.processor1.STDOUT_FILE)
-        self.wait_for_verification(self.processor2, 
"SMOKE-TEST-CLIENT-CLOSED", self.processor2.STDOUT_FILE)
-        self.wait_for_verification(self.processor3, 
"SMOKE-TEST-CLIENT-CLOSED", self.processor3.STDOUT_FILE)
+        if from_version.startswith("2."):
+            # some older versions crash on shutdown, so we allow crashes here.
+            self.wait_for_verification(self.processor1, 
"SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)", self.processor1.STDOUT_FILE)
+            self.wait_for_verification(self.processor2, 
"SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)", self.processor2.STDOUT_FILE)
+            self.wait_for_verification(self.processor3, 
"SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)", self.processor3.STDOUT_FILE)
+        else:
+            self.wait_for_verification(self.processor1, 
"SMOKE-TEST-CLIENT-CLOSED", self.processor1.STDOUT_FILE)
+            self.wait_for_verification(self.processor2, 
"SMOKE-TEST-CLIENT-CLOSED", self.processor2.STDOUT_FILE)
+            self.wait_for_verification(self.processor3, 
"SMOKE-TEST-CLIENT-CLOSED", self.processor3.STDOUT_FILE)
 
         self.roll_logs(self.processor1, ".1-1")
         self.roll_logs(self.processor2, ".1-1")
         self.roll_logs(self.processor3, ".1-1")
 
-        self.set_version(self.processor1, version)
-        self.set_version(self.processor2, version)
-        self.set_version(self.processor3, version)
+        self.set_version(self.processor1, to_version)
+        self.set_version(self.processor2, to_version)
+        self.set_version(self.processor3, to_version)
 
         self.processor1.start_node(self.processor1.node)
         self.processor2.start_node(self.processor2.node)
         self.processor3.start_node(self.processor3.node)
 
         # double-check the version
-        kafka_version_str = self.get_version_string(version)
+        kafka_version_str = self.get_version_string(to_version)
         self.wait_for_verification(self.processor1, kafka_version_str, 
self.processor1.LOG_FILE)
         self.wait_for_verification(self.processor2, kafka_version_str, 
self.processor2.LOG_FILE)
         self.wait_for_verification(self.processor3, kafka_version_str, 
self.processor3.LOG_FILE)
@@ -226,8 +232,8 @@ class StreamsUpgradeTest(Test):
     def purge_state_dir(self, processor):
         processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, 
allow_fail=False)
 
-    def do_stop_start_bounce(self, processor, upgrade_from, new_version, 
counter):
-        kafka_version_str = self.get_version_string(new_version)
+    def do_stop_start_bounce(self, processor, upgrade_from, from_version, 
to_version, counter):
+        kafka_version_str = self.get_version_string(to_version)
 
         first_other_processor = None
         second_other_processor = None
@@ -252,7 +258,12 @@ class StreamsUpgradeTest(Test):
                 second_other_monitor.wait_until(self.processed_msg,
                                                 timeout_sec=60,
                                                 err_msg="Never saw output '%s' 
on " % self.processed_msg + str(second_other_node.account))
-        node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % 
processor.STDOUT_FILE, allow_fail=False)
+
+        if from_version.startswith("2."):
+            # some older versions crash on shutdown, so we allow crashes here.
+            node.account.ssh_capture("grep -E 
'SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)' %s" % processor.STDOUT_FILE, 
allow_fail=False)
+        else:
+            node.account.ssh_capture("grep -E 'SMOKE-TEST-CLIENT-CLOSED' %s" % 
processor.STDOUT_FILE, allow_fail=False)
 
         if upgrade_from is None:  # upgrade disabled -- second round of 
rolling bounces
             roll_counter = ".1-"  # second round of rolling bounces
@@ -261,7 +272,7 @@ class StreamsUpgradeTest(Test):
 
         self.roll_logs(processor, roll_counter + str(counter))
 
-        self.set_version(processor, new_version)
+        self.set_version(processor, to_version)
         processor.set_upgrade_from(upgrade_from)
 
         grep_metadata_error = "grep 
\"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode 
subscription data: version=2\" "
@@ -273,7 +284,7 @@ class StreamsUpgradeTest(Test):
 
                         log_monitor.wait_until(kafka_version_str,
                                                timeout_sec=60,
-                                               err_msg="Could not detect Kafka 
Streams version " + new_version + " on " + str(node.account))
+                                               err_msg="Could not detect Kafka 
Streams version " + to_version + " on " + str(node.account))
                         first_other_monitor.wait_until(self.processed_msg,
                                                        timeout_sec=60,
                                                        err_msg="Never saw 
output '%s' on " % self.processed_msg + str(first_other_node.account))

Reply via email to