Repository: kafka Updated Branches: refs/heads/trunk 7fad45557 -> d190d89db
HOTFIX: In Connect test with auto topic creation disabled, ensure precreated topic is always used Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3112 from ewencp/hotfix-precreate-topic Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d190d89d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d190d89d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d190d89d Branch: refs/heads/trunk Commit: d190d89dbc3df90e00f5e3c55507f67c5818504e Parents: 7fad455 Author: Ewen Cheslack-Postava <[email protected]> Authored: Sun May 21 18:04:32 2017 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Sun May 21 18:04:32 2017 -0700 ---------------------------------------------------------------------- .../tests/connect/connect_distributed_test.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d190d89d/tests/kafkatest/tests/connect/connect_distributed_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 8fbc13b..5c7793a 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -67,7 +67,7 @@ class ConnectDistributedTest(Test): self.num_zk = 1 self.num_brokers = 1 self.topics = { - 'test' : { 'partitions': 1, 'replication-factor': 1 } + self.TOPIC: {'partitions': 1, 'replication-factor': 1} } self.zk = ZookeeperService(test_context, self.num_zk) @@ -75,7 +75,7 @@ class ConnectDistributedTest(Test): self.key_converter = "org.apache.kafka.connect.json.JsonConverter" self.value_converter = "org.apache.kafka.connect.json.JsonConverter" self.schemas = True - self.broker_config_overrides = [["auto.create.topics.enable","false"]] + self.broker_config_overrides = [["auto.create.topics.enable", "false"]] def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None): self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, @@ -205,7 +205,7 @@ class ConnectDistributedTest(Test): self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.start() - self.source = VerifiableSource(self.cc) + self.source = VerifiableSource(self.cc, topic=self.TOPIC) self.source.start() wait_until(lambda: self.is_running(self.source), timeout_sec=30, @@ -245,13 +245,13 @@ class ConnectDistributedTest(Test): self.cc.start() # use the verifiable source to produce a steady stream of messages - self.source = VerifiableSource(self.cc) + self.source = VerifiableSource(self.cc, topic=self.TOPIC) self.source.start() wait_until(lambda: len(self.source.committed_messages()) > 0, timeout_sec=30, err_msg="Timeout expired waiting for source task to produce a message") - self.sink = VerifiableSink(self.cc) + self.sink = VerifiableSink(self.cc, topics=[self.TOPIC]) self.sink.start() wait_until(lambda: self.is_running(self.sink), timeout_sec=30, @@ -289,7 +289,7 @@ class ConnectDistributedTest(Test): self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.start() - self.source = VerifiableSource(self.cc) + self.source = VerifiableSource(self.cc, topic=self.TOPIC) self.source.start() wait_until(lambda: self.is_running(self.source), timeout_sec=30, @@ -351,9 +351,9 @@ class ConnectDistributedTest(Test): self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.start() - self.source = VerifiableSource(self.cc, tasks=num_tasks, throughput=100) + self.source = VerifiableSource(self.cc, topic=self.TOPIC, tasks=num_tasks, throughput=100) self.source.start() - self.sink = VerifiableSink(self.cc, tasks=num_tasks) + self.sink = VerifiableSink(self.cc, tasks=num_tasks, topics=[self.TOPIC]) self.sink.start() for _ in range(3):
