This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2d77746 MINOR: fixes on streams upgrade test (#5754) 2d77746 is described below commit 2d77746a7beeb96d04ff3a907cde78f126cf6e85 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Sat Oct 13 22:39:24 2018 -0700 MINOR: fixes on streams upgrade test (#5754) 1. In test_upgrade_downgrade_brokers, allow duplicates to happen. 2. In test_version_probing_upgrade, grep the generation numbers from brokers at the end, and check if they can ever be synchronized. Reviewers: John Roesler <j...@confluent.io>, Matthias J. Sax <matth...@confluent.io>, Bill Bejeck <b...@confluent.io> --- .../tests/streams/streams_upgrade_test.py | 52 ++++++++++++++-------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 39e21bf..4314a35 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -122,7 +122,7 @@ class StreamsUpgradeTest(Test): self.processor1.stop() node = self.driver.node - node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False) + node.account.ssh("grep -E 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False) self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions) @@ -470,8 +470,6 @@ class StreamsUpgradeTest(Test): self.old_processors.remove(processor) self.upgraded_processors.append(processor) - current_generation = current_generation + 1 - log_monitor.wait_until("Kafka version : " + str(DEV_VERSION), timeout_sec=60, err_msg="Could not detect Kafka Streams version " + str(DEV_VERSION) + " in " + str(node.account)) @@ -480,16 +478,6 @@ class StreamsUpgradeTest(Test): timeout_sec=60, err_msg="Could not detect FutureStreamsPartitionAssignor in " + str(node.account)) - log_monitor.wait_until("Successfully joined group with generation " + str(current_generation), - timeout_sec=60, - err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(node.account)) - first_other_monitor.wait_until("Successfully joined group with generation " + str(current_generation), - timeout_sec=60, - err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(first_other_node.account)) - second_other_monitor.wait_until("Successfully joined group with generation " + str(current_generation), - timeout_sec=60, - err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(second_other_node.account)) - if processor == self.leader: self.update_leader() else: @@ -533,12 +521,34 @@ class StreamsUpgradeTest(Test): err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account)) # version probing should trigger second rebalance - current_generation = current_generation + 1 + # now we check that after consecutive rebalances we have synchronized generation + generation_synchronized = False + retries = 0 - for p in self.processors: - monitors[p].wait_until("Successfully joined group with generation " + str(current_generation), - timeout_sec=60, - err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(p.node.account)) + while retries < 10: + processor_found = self.extract_generation_from_logs(processor) + first_other_processor_found = self.extract_generation_from_logs(first_other_processor) + second_other_processor_found = self.extract_generation_from_logs(second_other_processor) + + if len(processor_found) > 0 and len(first_other_processor_found) > 0 and len(second_other_processor_found) > 0: + self.logger.info("processor: " + str(processor_found)) + self.logger.info("first other processor: " + str(first_other_processor_found)) + self.logger.info("second other processor: " + str(second_other_processor_found)) + + processor_generation = self.extract_highest_generation(processor_found) + first_other_processor_generation = self.extract_highest_generation(first_other_processor_found) + second_other_processor_generation = self.extract_highest_generation(second_other_processor_found) + + if processor_generation == first_other_processor_generation and processor_generation == second_other_processor_generation: + current_generation = processor_generation + generation_synchronized = True + break + + time.sleep(5) + retries = retries + 1 + + if generation_synchronized == False: + raise Exception("Never saw all three processors have the synchronized generation number") if processor == self.leader: self.update_leader() @@ -550,6 +560,12 @@ class StreamsUpgradeTest(Test): return current_generation + def extract_generation_from_logs(self, processor): + return list(processor.node.account.ssh_capture("grep \"Successfully joined group with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == \"generation\") beginning=i+1; if($i== \"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i }; for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % processor.LOG_FILE, allow_fail=True)) + + def extract_highest_generation(self, found_generations): + return int(found_generations[-1]) + def verify_metadata_no_upgraded_yet(self): for p in self.processors: found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))