This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2d77746  MINOR: fixes on streams upgrade test (#5754)
2d77746 is described below

commit 2d77746a7beeb96d04ff3a907cde78f126cf6e85
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Sat Oct 13 22:39:24 2018 -0700

    MINOR: fixes on streams upgrade test (#5754)
    
    1. In test_upgrade_downgrade_brokers, allow duplicates to happen.
    2. In test_version_probing_upgrade, grep the generation numbers from 
brokers at the end, and check if they can ever be synchronized.
    
    Reviewers: John Roesler <j...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>, Bill Bejeck <b...@confluent.io>
---
 .../tests/streams/streams_upgrade_test.py          | 52 ++++++++++++++--------
 1 file changed, 34 insertions(+), 18 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 39e21bf..4314a35 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -122,7 +122,7 @@ class StreamsUpgradeTest(Test):
         self.processor1.stop()
 
         node = self.driver.node
-        node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % 
self.driver.STDOUT_FILE, allow_fail=False)
+        node.account.ssh("grep -E 
'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' %s" % 
self.driver.STDOUT_FILE, allow_fail=False)
         self.processor1.node.account.ssh_capture("grep 
SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
 
     @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
@@ -470,8 +470,6 @@ class StreamsUpgradeTest(Test):
                     self.old_processors.remove(processor)
                     self.upgraded_processors.append(processor)
 
-                    current_generation = current_generation + 1
-
                     log_monitor.wait_until("Kafka version : " + 
str(DEV_VERSION),
                                            timeout_sec=60,
                                            err_msg="Could not detect Kafka 
Streams version " + str(DEV_VERSION) + " in " + str(node.account))
@@ -480,16 +478,6 @@ class StreamsUpgradeTest(Test):
                                            timeout_sec=60,
                                            err_msg="Could not detect 
FutureStreamsPartitionAssignor in " + str(node.account))
 
-                    log_monitor.wait_until("Successfully joined group with 
generation " + str(current_generation),
-                                           timeout_sec=60,
-                                           err_msg="Never saw output 
'Successfully joined group with generation " + str(current_generation) + "' on" 
+ str(node.account))
-                    first_other_monitor.wait_until("Successfully joined group 
with generation " + str(current_generation),
-                                                   timeout_sec=60,
-                                                   err_msg="Never saw output 
'Successfully joined group with generation " + str(current_generation) + "' on" 
+ str(first_other_node.account))
-                    second_other_monitor.wait_until("Successfully joined group 
with generation " + str(current_generation),
-                                                    timeout_sec=60,
-                                                    err_msg="Never saw output 
'Successfully joined group with generation " + str(current_generation) + "' on" 
+ str(second_other_node.account))
-
                     if processor == self.leader:
                         self.update_leader()
                     else:
@@ -533,12 +521,34 @@ class StreamsUpgradeTest(Test):
                                            err_msg="Could not detect 
'Triggering new rebalance' at upgrading node " + str(node.account))
 
                     # version probing should trigger second rebalance
-                    current_generation = current_generation + 1
+                    # now we check that after consecutive rebalances we have 
synchronized generation
+                    generation_synchronized = False
+                    retries = 0
 
-                    for p in self.processors:
-                        monitors[p].wait_until("Successfully joined group with 
generation " + str(current_generation),
-                                               timeout_sec=60,
-                                               err_msg="Never saw output 
'Successfully joined group with generation " + str(current_generation) + "' on" 
+ str(p.node.account))
+                    while retries < 10:
+                        processor_found = 
self.extract_generation_from_logs(processor)
+                        first_other_processor_found = 
self.extract_generation_from_logs(first_other_processor)
+                        second_other_processor_found = 
self.extract_generation_from_logs(second_other_processor)
+
+                        if len(processor_found) > 0 and 
len(first_other_processor_found) > 0 and len(second_other_processor_found) > 0:
+                            self.logger.info("processor: " + 
str(processor_found))
+                            self.logger.info("first other processor: " + 
str(first_other_processor_found))
+                            self.logger.info("second other processor: " + 
str(second_other_processor_found))
+
+                            processor_generation = 
self.extract_highest_generation(processor_found)
+                            first_other_processor_generation = 
self.extract_highest_generation(first_other_processor_found)
+                            second_other_processor_generation = 
self.extract_highest_generation(second_other_processor_found)
+
+                            if processor_generation == 
first_other_processor_generation and processor_generation == 
second_other_processor_generation:
+                                current_generation = processor_generation
+                                generation_synchronized = True
+                                break
+
+                        time.sleep(5)
+                        retries = retries + 1
+
+                    if generation_synchronized == False:
+                        raise Exception("Never saw all three processors have 
the synchronized generation number")
 
                     if processor == self.leader:
                         self.update_leader()
@@ -550,6 +560,12 @@ class StreamsUpgradeTest(Test):
 
         return current_generation
 
+    def extract_generation_from_logs(self, processor):
+        return list(processor.node.account.ssh_capture("grep \"Successfully 
joined group with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == 
\"generation\") beginning=i+1; if($i== 
\"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i 
}; for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % 
processor.LOG_FILE, allow_fail=True))
+
+    def extract_highest_generation(self, found_generations):
+        return int(found_generations[-1])
+
     def verify_metadata_no_upgraded_yet(self):
         for p in self.processors:
             found = list(p.node.account.ssh_capture("grep \"Sent a version 4 
subscription and group leader.s latest supported version is 5. Upgrading 
subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, 
allow_fail=True))

Reply via email to