This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new be88f5a MINOR: Fix StreamsOptimizedTest (#9911)
be88f5a is described below
commit be88f5a1aa5efcd321d24befa7877d541628c714
Author: John Roesler <[email protected]>
AuthorDate: Tue Jan 19 14:57:34 2021 -0600
MINOR: Fix StreamsOptimizedTest (#9911)
We have seen recent system test timeouts associated with this test.
Analysis revealed an excessive amount of time spent searching
for test conditions in the logs.
This change addresses the issue by dropping some unnecessary
checks and using a more efficient log search mechanism.
Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang
<[email protected]>
---
.../tests/streams/streams_optimized_test.py | 55 ++++++++++++----------
1 file changed, 29 insertions(+), 26 deletions(-)
diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py
b/tests/kafkatest/tests/streams/streams_optimized_test.py
index adea2ea..3209b25 100644
--- a/tests/kafkatest/tests/streams/streams_optimized_test.py
+++ b/tests/kafkatest/tests/streams/streams_optimized_test.py
@@ -15,6 +15,7 @@
import time
from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
from kafkatest.services.streams import StreamsResetter
@@ -65,30 +66,41 @@ class StreamsOptimizedTest(Test):
processors = [processor1, processor2, processor3]
- # produce records continually during the test
+ self.logger.info("produce records continually during the test")
self.producer.start()
- # start all processors unoptimized
+ self.logger.info("start all processors unoptimized")
for processor in processors:
self.set_topics(processor)
processor.CLEAN_NODE_ENABLED = False
self.verify_running_repartition_topic_count(processor, 4)
+ self.logger.info("verify unoptimized")
self.verify_processing(processors, verify_individual_operations=False)
+ self.logger.info("stop unoptimized")
stop_processors(processors, self.stopped_message)
+ self.logger.info("reset")
self.reset_application()
+ for processor in processors:
+ processor.node.account.ssh("mv " + processor.LOG_FILE + " " +
processor.LOG_FILE + ".1", allow_fail=False)
+ processor.node.account.ssh("mv " + processor.STDOUT_FILE + " " +
processor.STDOUT_FILE + ".1", allow_fail=False)
+ processor.node.account.ssh("mv " + processor.STDERR_FILE + " " +
processor.STDERR_FILE + ".1", allow_fail=False)
+ processor.node.account.ssh("mv " + processor.CONFIG_FILE + " " +
processor.CONFIG_FILE + ".1", allow_fail=False)
- # start again with topology optimized
+ self.logger.info("start again with topology optimized")
for processor in processors:
processor.OPTIMIZED_CONFIG = 'all'
self.verify_running_repartition_topic_count(processor, 1)
+ self.logger.info("verify optimized")
self.verify_processing(processors, verify_individual_operations=True)
+ self.logger.info("stop optimized")
stop_processors(processors, self.stopped_message)
+ self.logger.info("teardown")
self.producer.stop()
self.kafka.stop()
self.zookeeper.stop()
@@ -110,34 +122,25 @@ class StreamsOptimizedTest(Test):
% repartition_topic_count +
str(processor.node.account))
def verify_processing(self, processors, verify_individual_operations):
+ # This test previously had logic to account for skewed assignments, in
which not all processors may
+ # receive active assignments. I don't think this will happen anymore,
but keep an eye out if we see
+ # test failures here. If that does resurface, note that the prior
implementation was not correct.
+ # A better approach would be to make sure we see processing of each
partition across the whole cluster
+ # instead of just expecting to see each node perform some processing.
for processor in processors:
- if not self.all_source_subtopology_tasks(processor):
- if verify_individual_operations:
- for operation in self.operation_pattern.split('\|'):
- self.do_verify(processor, operation)
- else:
- self.do_verify(processor, self.operation_pattern)
+ if verify_individual_operations:
+ for operation in self.operation_pattern.split('\|'):
+ self.do_verify(processor, operation)
else:
- self.logger.info("Skipping processor %s with all source tasks"
% processor.node.account)
+ self.do_verify(processor, self.operation_pattern)
def do_verify(self, processor, pattern):
self.logger.info("Verifying %s processing pattern in STDOUT_FILE" %
pattern)
- with processor.node.account.monitor_log(processor.STDOUT_FILE) as
monitor:
- monitor.wait_until(pattern,
- timeout_sec=60,
- err_msg="Never saw processing of %s " % pattern
+ str(processor.node.account))
-
- def all_source_subtopology_tasks(self, processor):
- retries = 0
- while retries < 5:
- found = list(processor.node.account.ssh_capture("sed -n
's/.*current active tasks: \[\(\(0_[0-9], \)\{3\}0_[0-9]\)\].*/\1/p' %s" %
processor.LOG_FILE, allow_fail=True))
- self.logger.info("Returned %s from assigned task check" % found)
- if len(found) > 0:
- return True
- retries += 1
- time.sleep(1)
-
- return False
+ self.logger.info(list(processor.node.account.ssh_capture("ls -lh %s" %
(processor.STDOUT_FILE), allow_fail=True)))
+ wait_until(
+ lambda: processor.node.account.ssh("grep --max-count 1 '%s' %s" %
(pattern, processor.STDOUT_FILE), allow_fail=True) == 0,
+ timeout_sec=60
+ )
def set_topics(self, processor):
processor.INPUT_TOPIC = self.input_topic