Repository: kafka Updated Branches: refs/heads/trunk d190d89db -> dc10b0ea0
MINOR: Fix race condition in TestVerifiableProducer sanity test ## Fixes race condition in TestVerifiableProducer sanity test: The test starts a producer, waits for at least 5 acks, and then logs in to the worker to grep for the producer process to figure out what version it is running. The problem was that the producer was set up to produce 1000 messages at a rate of 1000 msgs/s and then exit. This means it will have a typical runtime slightly above 1 second. Logging in to the vagrant instance might take longer than that thus resulting in the process grep to fail, failing the test. This commit doesn't really fix the issue - a proper fix would be to tell the producer to stick around until explicitly killed - but it increases the chances of the test passing, at the expense of a slightly longer runtime. ## Improves error reporting when is_version() fails Author: Magnus Edenhill <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #2765 from edenhill/trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc10b0ea Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc10b0ea Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc10b0ea Branch: refs/heads/trunk Commit: dc10b0ea014974b450f2abcd4a58e09d5b988e9e Parents: d190d89 Author: Magnus Edenhill <[email protected]> Authored: Sun May 21 18:23:12 2017 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Sun May 21 18:23:12 2017 -0700 ---------------------------------------------------------------------- tests/kafkatest/sanity_checks/test_kafka_version.py | 6 +++--- .../kafkatest/sanity_checks/test_verifiable_producer.py | 12 +++++++++--- tests/kafkatest/utils/util.py | 8 ++++++-- 3 files changed, 18 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dc10b0ea/tests/kafkatest/sanity_checks/test_kafka_version.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/sanity_checks/test_kafka_version.py b/tests/kafkatest/sanity_checks/test_kafka_version.py index 7e65807..ca58ca5 100644 --- a/tests/kafkatest/sanity_checks/test_kafka_version.py +++ b/tests/kafkatest/sanity_checks/test_kafka_version.py @@ -42,7 +42,7 @@ class KafkaVersionTest(Test): node.version = LATEST_0_8_2 self.kafka.start() - assert is_version(node, [LATEST_0_8_2]) + assert is_version(node, [LATEST_0_8_2], logger=self.logger) @cluster(num_nodes=3) def test_multi_version(self): @@ -54,5 +54,5 @@ class KafkaVersionTest(Test): self.kafka.nodes[1].config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X" self.kafka.start() - assert is_version(self.kafka.nodes[0], [DEV_BRANCH.vstring]) - assert is_version(self.kafka.nodes[1], [LATEST_0_8_2]) + assert is_version(self.kafka.nodes[0], [DEV_BRANCH.vstring], logger=self.logger) + assert is_version(self.kafka.nodes[1], [LATEST_0_8_2], logger=self.logger) http://git-wip-us.apache.org/repos/asf/kafka/blob/dc10b0ea/tests/kafkatest/sanity_checks/test_verifiable_producer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py index be10574..a008532 100644 --- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py +++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py @@ -39,7 +39,7 @@ class TestVerifiableProducer(Test): self.num_messages = 1000 # This will produce to source kafka cluster self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, - max_messages=self.num_messages, throughput=1000) + max_messages=self.num_messages, throughput=self.num_messages/5) def setUp(self): self.zk.start() @@ -66,10 +66,16 @@ class TestVerifiableProducer(Test): # that this check works with DEV_BRANCH # When running VerifiableProducer 0.8.X, both the current branch version and 0.8.X should show up because of the # way verifiable producer pulls in some development directories into its classpath + # + # If the test fails here because 'ps .. | grep' couldn't find the process it means + # the login and grep that is_version() performs is slower than + # the time it takes the producer to produce its messages. + # Easy fix is to decrease throughput= above, the good fix is to make the producer + # not terminate until explicitly killed in this case. if node.version <= LATEST_0_8_2: - assert is_version(node, [node.version.vstring, DEV_BRANCH.vstring]) + assert is_version(node, [node.version.vstring, DEV_BRANCH.vstring], logger=self.logger) else: - assert is_version(node, [node.version.vstring]) + assert is_version(node, [node.version.vstring], logger=self.logger) self.producer.wait() num_produced = self.producer.num_acked http://git-wip-us.apache.org/repos/asf/kafka/blob/dc10b0ea/tests/kafkatest/utils/util.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py index f004ece..3be8d80 100644 --- a/tests/kafkatest/utils/util.py +++ b/tests/kafkatest/utils/util.py @@ -31,7 +31,7 @@ def _kafka_jar_versions(proc_string): return set(versions) -def is_version(node, version_list, proc_grep_string="kafka"): +def is_version(node, version_list, proc_grep_string="kafka", logger=None): """Heuristic to check that only the specified version appears in the classpath of the process A useful tool to aid in checking that service version apis are working correctly. """ @@ -39,7 +39,11 @@ def is_version(node, version_list, proc_grep_string="kafka"): assert len(lines) == 1 versions = _kafka_jar_versions(lines[0]) - return versions == {str(v) for v in version_list} + r = versions == {str(v) for v in version_list} + if not r and logger is not None: + logger.warning("%s: %s version mismatch: expected %s: actual %s" % \ + (str(node), proc_grep_string, version_list, versions)) + return r def is_int(msg):
