This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3453e9e  HOTFIX: fix system test race condition (#7836)
3453e9e is described below

commit 3453e9e2eee1400901ef8e1965d657b825d5d64a
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Dec 31 21:44:31 2019 -0500

    HOTFIX: fix system test race condition (#7836)
    
    In some system tests a Streams app is started and then prints a message to 
stdout, which the system test waits for to confirm the node has successfully 
been brought up. It then greps for certain log messages in a retriable loop.
    
    But waiting on the Streams app to start/print to stdout does not mean the 
log file has been created yet, so the grep may return an error. Although this 
occurs in a retriable loop it is assumed that grep will not fail, and the 
result is piped to wc and then blindly converted to an int in the python 
function, which fails since the error message is a string (throws ValueError)
    
    We should catch the ValueError and return a 0 so it can try again rather 
than immediately crash
    
    Reviewers: Bill Bejeck <[email protected]>, John Roesler 
<[email protected]>, Guozhang Wang <[email protected]>
---
 .../apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java | 2 +-
 tests/kafkatest/tests/streams/base_streams_test.py                  | 6 +++++-
 .../kafkatest/tests/streams/streams_broker_down_resilience_test.py  | 2 +-
 3 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index b605f46..ac4e120 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -97,7 +97,7 @@ public class StreamsBrokerDownResilienceTest {
                 public void apply(final String key, final String value) {
                     System.out.println("received key " + key + " and value " + 
value);
                     messagesProcessed++;
-                    System.out.println("processed" + messagesProcessed + 
"messages");
+                    System.out.println("processed " + messagesProcessed + " 
messages");
                     System.out.flush();
                 }
             }).to(SINK_TOPIC);
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py 
b/tests/kafkatest/tests/streams/base_streams_test.py
index 53e4231..256693c 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -98,5 +98,9 @@ class BaseStreamsTest(KafkaTest):
     @staticmethod
     def verify_from_file(processor, message, file):
         result = processor.node.account.ssh_output("grep -E '%s' %s | wc -l" % 
(message, file), allow_fail=False)
-        return int(result)
+        try:
+          return int(result)
+        except ValueError:
+          self.logger.warn("Command failed with ValueError: " + result)
+          return 0
 
diff --git 
a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py 
b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index ee5feae..58f3b18 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -28,7 +28,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
     outputTopic = "streamsResilienceSink"
     client_id = "streams-broker-resilience-verify-consumer"
     num_messages = 10000
-    message = "processed[0-9]*messages"
+    message = "processed [0-9]* messages"
     connected_message = "Discovered group coordinator"
 
     def __init__(self, test_context):

Reply via email to