This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new be88f5a  MINOR: Fix StreamsOptimizedTest (#9911)
be88f5a is described below

commit be88f5a1aa5efcd321d24befa7877d541628c714
Author: John Roesler <[email protected]>
AuthorDate: Tue Jan 19 14:57:34 2021 -0600

    MINOR: Fix StreamsOptimizedTest (#9911)
    
    We have seen recent system test timeouts associated with this test.
    Analysis revealed an excessive amount of time spent searching
    for test conditions in the logs.
    
    This change addresses the issue by dropping some unnecessary
    checks and using a more efficient log search mechanism.
    
    Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../tests/streams/streams_optimized_test.py        | 55 ++++++++++++----------
 1 file changed, 29 insertions(+), 26 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py 
b/tests/kafkatest/tests/streams/streams_optimized_test.py
index adea2ea..3209b25 100644
--- a/tests/kafkatest/tests/streams/streams_optimized_test.py
+++ b/tests/kafkatest/tests/streams/streams_optimized_test.py
@@ -15,6 +15,7 @@
 
 import time
 from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
 from kafkatest.services.streams import StreamsResetter
@@ -65,30 +66,41 @@ class StreamsOptimizedTest(Test):
 
         processors = [processor1, processor2, processor3]
 
-        # produce records continually during the test
+        self.logger.info("produce records continually during the test")
         self.producer.start()
 
-        # start all processors unoptimized
+        self.logger.info("start all processors unoptimized")
         for processor in processors:
             self.set_topics(processor)
             processor.CLEAN_NODE_ENABLED = False
             self.verify_running_repartition_topic_count(processor, 4)
 
+        self.logger.info("verify unoptimized")
         self.verify_processing(processors, verify_individual_operations=False)
 
+        self.logger.info("stop unoptimized")
         stop_processors(processors, self.stopped_message)
 
+        self.logger.info("reset")
         self.reset_application()
+        for processor in processors:
+            processor.node.account.ssh("mv " + processor.LOG_FILE + " " + 
processor.LOG_FILE + ".1", allow_fail=False)
+            processor.node.account.ssh("mv " + processor.STDOUT_FILE + " " + 
processor.STDOUT_FILE + ".1", allow_fail=False)
+            processor.node.account.ssh("mv " + processor.STDERR_FILE + " " + 
processor.STDERR_FILE + ".1", allow_fail=False)
+            processor.node.account.ssh("mv " + processor.CONFIG_FILE + " " + 
processor.CONFIG_FILE + ".1", allow_fail=False)
 
-        # start again with topology optimized
+        self.logger.info("start again with topology optimized")
         for processor in processors:
             processor.OPTIMIZED_CONFIG = 'all'
             self.verify_running_repartition_topic_count(processor, 1)
 
+        self.logger.info("verify optimized")
         self.verify_processing(processors, verify_individual_operations=True)
 
+        self.logger.info("stop optimized")
         stop_processors(processors, self.stopped_message)
 
+        self.logger.info("teardown")
         self.producer.stop()
         self.kafka.stop()
         self.zookeeper.stop()
@@ -110,34 +122,25 @@ class StreamsOptimizedTest(Test):
                                        % repartition_topic_count + 
str(processor.node.account))
 
     def verify_processing(self, processors, verify_individual_operations):
+        # This test previously had logic to account for skewed assignments, in 
which not all processors may
+        # receive active assignments. I don't think this will happen anymore, 
but keep an eye out if we see
+        # test failures here. If that does resurface, note that the prior 
implementation was not correct.
+        # A better approach would be to make sure we see processing of each 
partition across the whole cluster
+        # instead of just expecting to see each node perform some processing.
         for processor in processors:
-            if not self.all_source_subtopology_tasks(processor):
-                if verify_individual_operations:
-                    for operation in self.operation_pattern.split('\|'):
-                        self.do_verify(processor, operation)
-                else:
-                    self.do_verify(processor, self.operation_pattern)
+            if verify_individual_operations:
+                for operation in self.operation_pattern.split('\|'):
+                    self.do_verify(processor, operation)
             else:
-                self.logger.info("Skipping processor %s with all source tasks" 
% processor.node.account)
+                self.do_verify(processor, self.operation_pattern)
 
     def do_verify(self, processor, pattern):
         self.logger.info("Verifying %s processing pattern in STDOUT_FILE" % 
pattern)
-        with processor.node.account.monitor_log(processor.STDOUT_FILE) as 
monitor:
-            monitor.wait_until(pattern,
-                               timeout_sec=60,
-                               err_msg="Never saw processing of %s " % pattern 
+ str(processor.node.account))
-
-    def all_source_subtopology_tasks(self, processor):
-        retries = 0
-        while retries < 5:
-            found = list(processor.node.account.ssh_capture("sed -n 
's/.*current active tasks: \[\(\(0_[0-9], \)\{3\}0_[0-9]\)\].*/\1/p' %s" % 
processor.LOG_FILE, allow_fail=True))
-            self.logger.info("Returned %s from assigned task check" % found)
-            if len(found) > 0:
-                return True
-            retries += 1
-            time.sleep(1)
-
-        return False
+        self.logger.info(list(processor.node.account.ssh_capture("ls -lh %s" % 
(processor.STDOUT_FILE), allow_fail=True)))
+        wait_until(
+            lambda: processor.node.account.ssh("grep --max-count 1 '%s' %s" % 
(pattern, processor.STDOUT_FILE), allow_fail=True) == 0,
+            timeout_sec=60
+        )
 
     def set_topics(self, processor):
         processor.INPUT_TOPIC = self.input_topic

Reply via email to