Repository: kafka Updated Branches: refs/heads/trunk 69269e76a -> cff03f8b6
KAFKA-2642; Run replication tests with SSL and SASL clients For SSL and SASL replication tests, set security protocol for clients as well. Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Ben Stopford <benstopf...@gmail.com>, Geoff Anderson <ge...@confluent.io>, Jun Rao <jun...@gmail.com> Closes #563 from rajinisivaram/KAFKA-2642 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cff03f8b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cff03f8b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cff03f8b Branch: refs/heads/trunk Commit: cff03f8b68604ce8c8c743801b075b655af23011 Parents: 69269e7 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Wed Dec 2 08:08:37 2015 -0600 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Dec 2 08:08:37 2015 -0600 ---------------------------------------------------------------------- tests/kafkatest/tests/compatibility_test.py | 2 +- tests/kafkatest/tests/quota_test.py | 2 +- tests/kafkatest/tests/replication_test.py | 9 +++++---- tests/kafkatest/tests/security_rolling_upgrade_test.py | 2 +- tests/kafkatest/tests/upgrade_test.py | 2 +- 5 files changed, 9 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/compatibility_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/compatibility_test.py b/tests/kafkatest/tests/compatibility_test.py index 0310d2f..47e2752 100644 --- a/tests/kafkatest/tests/compatibility_test.py +++ b/tests/kafkatest/tests/compatibility_test.py @@ -32,7 +32,7 @@ class ClientCompatibilityTest(Test): self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: { "partitions": 3, "replication-factor": 3, - "min.insync.replicas": 2}}) + 'configs': {"min.insync.replicas": 2}}}) self.zk.start() self.kafka.start() http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/quota_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/quota_test.py b/tests/kafkatest/tests/quota_test.py index 2649d7d..7c2ec59 100644 --- a/tests/kafkatest/tests/quota_test.py +++ b/tests/kafkatest/tests/quota_test.py @@ -50,7 +50,7 @@ class QuotaTest(Test): self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, security_protocol='PLAINTEXT', interbroker_security_protocol='PLAINTEXT', - topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'min.insync.replicas': 1}}, + topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}}, quota_config=self.quota_config, jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec', 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'], http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/replication_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py index 6633a4f..a8f2337 100644 --- a/tests/kafkatest/tests/replication_test.py +++ b/tests/kafkatest/tests/replication_test.py @@ -93,9 +93,9 @@ class ReplicationTest(ProduceConsumeValidateTest): self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: { "partitions": 3, "replication-factor": 3, - "min.insync.replicas": 2} + 'configs': {"min.insync.replicas": 2}} }) - self.producer_throughput = 10000 + self.producer_throughput = 1000 self.num_producers = 1 self.num_consumers = 1 @@ -123,10 +123,11 @@ class ReplicationTest(ProduceConsumeValidateTest): - Validate that every acked message was consumed """ - self.kafka.security_protocol = 'PLAINTEXT' + self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol + new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=60000, message_validator=is_int) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int) self.kafka.start() self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self)) http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/security_rolling_upgrade_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/security_rolling_upgrade_test.py b/tests/kafkatest/tests/security_rolling_upgrade_test.py index 279cd26..1acf58b 100644 --- a/tests/kafkatest/tests/security_rolling_upgrade_test.py +++ b/tests/kafkatest/tests/security_rolling_upgrade_test.py @@ -40,7 +40,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: { "partitions": 3, "replication-factor": 3, - "min.insync.replicas": 2}}) + 'configs': {"min.insync.replicas": 2}}}) self.zk.start() #reduce replica.lag.time.max.ms due to KAFKA-2827 http://git-wip-us.apache.org/repos/asf/kafka/blob/cff03f8b/tests/kafkatest/tests/upgrade_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py index 245129a..ea6f7ac 100644 --- a/tests/kafkatest/tests/upgrade_test.py +++ b/tests/kafkatest/tests/upgrade_test.py @@ -33,7 +33,7 @@ class TestUpgrade(ProduceConsumeValidateTest): self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: { "partitions": 3, "replication-factor": 3, - "min.insync.replicas": 2}}) + 'configs': {"min.insync.replicas": 2}}}) self.zk.start() self.kafka.start()