Repository: kafka Updated Branches: refs/heads/trunk 6f7ed15da -> 62e043a86
http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/security_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py index b6bc656..4edbcff 100644 --- a/tests/kafkatest/tests/core/security_test.py +++ b/tests/kafkatest/tests/core/security_test.py @@ -13,7 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.cluster.remoteaccount import RemoteCommandError from ducktape.mark import parametrize +from ducktape.mark.resource import cluster +from ducktape.utils.util import wait_until +from ducktape.errors import TimeoutError from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -23,20 +27,19 @@ from kafkatest.services.security.security_config import SecurityConfig from kafkatest.services.security.security_config import SslStores from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -import time class TestSslStores(SslStores): - def __init__(self): - super(TestSslStores, self).__init__() - self.invalid_hostname = False + def __init__(self, local_scratch_dir, valid_hostname=True): + super(TestSslStores, self).__init__(local_scratch_dir) + self.valid_hostname = valid_hostname self.generate_ca() self.generate_truststore() def hostname(self, node): - if (self.invalid_hostname): - return "invalidhost" - else: + if self.valid_hostname: return super(TestSslStores, self).hostname(node) + else: + return "invalidhostname" class SecurityTest(ProduceConsumeValidateTest): """ @@ -62,6 +65,18 @@ class SecurityTest(ProduceConsumeValidateTest): def setUp(self): self.zk.start() + def producer_consumer_have_expected_error(self, error): + try: + for node in self.producer.nodes: + node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE)) + for node in self.consumer.nodes: + node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE)) + except RemoteCommandError: + return False + + return True + + @cluster(num_nodes=7) @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL') @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol): @@ -74,29 +89,35 @@ class SecurityTest(ProduceConsumeValidateTest): self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = interbroker_security_protocol - SecurityConfig.ssl_stores = TestSslStores() + SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir, valid_hostname=False) - SecurityConfig.ssl_stores.invalid_hostname = True self.kafka.start() self.create_producer_and_consumer() self.producer.log_level = "TRACE" + self.producer.start() self.consumer.start() - time.sleep(10) - assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation did not fail with invalid hostname" - error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE' - for node in self.producer.nodes: - node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE)) - for node in self.consumer.nodes: - node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE)) + try: + wait_until(lambda: self.producer.num_acked > 0, timeout_sec=5) + + # Fail quickly if messages are successfully acked + raise RuntimeError("Messages published successfully but should not have!" + " Endpoint validation did not fail with invalid hostname") + except TimeoutError: + # expected + pass + + error = 'SSLHandshakeException' if security_protocol == 'SSL' else 'LEADER_NOT_AVAILABLE' + wait_until(lambda: self.producer_consumer_have_expected_error(error), timeout_sec=5) self.producer.stop() self.consumer.stop() self.producer.log_level = "INFO" - SecurityConfig.ssl_stores.invalid_hostname = False + SecurityConfig.ssl_stores.valid_hostname = True for node in self.kafka.nodes: self.kafka.restart_node(node, clean_shutdown=True) + self.create_producer_and_consumer() self.run_produce_consume_validate() http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/simple_consumer_shell_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/simple_consumer_shell_test.py b/tests/kafkatest/tests/core/simple_consumer_shell_test.py index 74a7eeb..882aae7 100644 --- a/tests/kafkatest/tests/core/simple_consumer_shell_test.py +++ b/tests/kafkatest/tests/core/simple_consumer_shell_test.py @@ -16,6 +16,8 @@ from ducktape.utils.util import wait_until from ducktape.tests.test import Test +from ducktape.mark.resource import cluster + from kafkatest.services.simple_consumer_shell import SimpleConsumerShell from kafkatest.services.verifiable_producer import VerifiableProducer @@ -26,6 +28,7 @@ MAX_MESSAGES = 100 NUM_PARTITIONS = 1 REPLICATION_FACTOR = 1 + class SimpleConsumerShellTest(Test): """ Tests SimpleConsumerShell tool @@ -61,6 +64,7 @@ class SimpleConsumerShellTest(Test): self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka, TOPIC) self.simple_consumer_shell.start() + @cluster(num_nodes=4) def test_simple_consumer_shell(self): """ Tests if SimpleConsumerShell is fetching expected records http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/throttling_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py index 2e21322..9684099 100644 --- a/tests/kafkatest/tests/core/throttling_test.py +++ b/tests/kafkatest/tests/core/throttling_test.py @@ -16,6 +16,7 @@ import time import math from ducktape.mark import parametrize +from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until from kafkatest.services.performance import ProducerPerformanceService @@ -137,6 +138,7 @@ class ThrottlingTest(ProduceConsumeValidateTest): estimated_throttled_time, time_taken)) + @cluster(num_nodes=10) @parametrize(bounce_brokers=False) @parametrize(bounce_brokers=True) def test_throttled_reassignment(self, bounce_brokers): http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/upgrade_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py index 15a9696..34af4eb 100644 --- a/tests/kafkatest/tests/core/upgrade_test.py +++ b/tests/kafkatest/tests/core/upgrade_test.py @@ -14,6 +14,7 @@ # limitations under the License. from ducktape.mark import parametrize +from ducktape.mark.resource import cluster import json @@ -60,10 +61,13 @@ class TestUpgrade(ProduceConsumeValidateTest): node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version self.kafka.start_node(node) + @cluster(num_nodes=6) @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=False) @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"]) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=False) + @cluster(num_nodes=7) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], security_protocol="SASL_SSL") + @cluster(num_nodes=6) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"]) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=False) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"]) @@ -71,6 +75,7 @@ class TestUpgrade(ProduceConsumeValidateTest): @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"]) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=False) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"]) + @cluster(num_nodes=7) @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"], new_consumer=False) @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"], new_consumer=False) def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types, http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py index 0cfdf16..f8b2146 100644 --- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py +++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py @@ -14,6 +14,7 @@ # limitations under the License. from ducktape.mark import matrix +from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -92,7 +93,8 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest): self.kafka.stop_node(node) self.kafka.start_node(node) - @matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"]) + @cluster(num_nodes=9) + @matrix(security_protocol=["PLAINTEXT", "SSL", "SASL_SSL", "SASL_PLAINTEXT"]) def test_zk_security_upgrade(self, security_protocol): self.zk.start() self.kafka.security_protocol = security_protocol @@ -103,7 +105,7 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest): self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group) - if(self.no_sasl): + if self.no_sasl: self.kafka.start() else: self.kafka.start(self.zk.zk_principals) http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/streams/streams_bounce_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/streams/streams_bounce_test.py b/tests/kafkatest/tests/streams/streams_bounce_test.py index d674641..169bbc1 100644 --- a/tests/kafkatest/tests/streams/streams_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_bounce_test.py @@ -13,12 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import ignore +from ducktape.mark.resource import cluster from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService import time + class StreamsBounceTest(KafkaTest): """ Simple test of Kafka Streams. @@ -41,6 +42,7 @@ class StreamsBounceTest(KafkaTest): self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) + @cluster(num_nodes=5) def test_bounce(self): """ Start a smoke test client, then abort (kill -9) and restart it a few times. @@ -51,11 +53,11 @@ class StreamsBounceTest(KafkaTest): self.processor1.start() - time.sleep(15); + time.sleep(15) self.processor1.abortThenRestart() - time.sleep(15); + time.sleep(15) # enable this after we add change log partition replicas #self.kafka.signal_leader("data") http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/streams/streams_smoke_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py index ea05c5f..bc84878 100644 --- a/tests/kafkatest/tests/streams/streams_smoke_test.py +++ b/tests/kafkatest/tests/streams/streams_smoke_test.py @@ -13,12 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import ignore + +from ducktape.mark.resource import cluster from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService import time + class StreamsSmokeTest(KafkaTest): """ Simple test of Kafka Streams. @@ -45,6 +47,7 @@ class StreamsSmokeTest(KafkaTest): self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) @ignore + @cluster(num_nodes=7) def test_streams(self): """ Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times. @@ -56,14 +59,14 @@ class StreamsSmokeTest(KafkaTest): self.processor1.start() self.processor2.start() - time.sleep(15); + time.sleep(15) self.processor3.start() self.processor1.stop() - time.sleep(15); + time.sleep(15) - self.processor4.start(); + self.processor4.start() self.driver.wait() self.driver.stop() http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/tools/log4j_appender_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/tools/log4j_appender_test.py b/tests/kafkatest/tests/tools/log4j_appender_test.py index 42cfeea..7e0b9ee 100644 --- a/tests/kafkatest/tests/tools/log4j_appender_test.py +++ b/tests/kafkatest/tests/tools/log4j_appender_test.py @@ -17,6 +17,7 @@ from ducktape.utils.util import wait_until from ducktape.tests.test import Test from ducktape.mark import matrix +from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -27,6 +28,7 @@ from kafkatest.services.security.security_config import SecurityConfig TOPIC = "topic-log4j-appender" MAX_MESSAGES = 100 + class Log4jAppenderTest(Test): """ Tests KafkaLog4jAppender using VerifiableKafkaLog4jAppender that appends increasing ints to a Kafka topic @@ -62,7 +64,6 @@ class Log4jAppenderTest(Test): self.logger.debug("Received message: %s" % msg) self.messages_received_count += 1 - def start_consumer(self, security_protocol): enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC, @@ -70,7 +71,10 @@ class Log4jAppenderTest(Test): message_validator=self.custom_message_validator) self.consumer.start() - @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL']) + @cluster(num_nodes=4) + @matrix(security_protocol=['PLAINTEXT', 'SSL']) + @cluster(num_nodes=5) + @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL']) def test_log4j_appender(self, security_protocol='PLAINTEXT'): """ Tests if KafkaLog4jAppender is producing to Kafka topic http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/kafkatest/tests/tools/replica_verification_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/tools/replica_verification_test.py b/tests/kafkatest/tests/tools/replica_verification_test.py index 1b625e9..a5b3390 100644 --- a/tests/kafkatest/tests/tools/replica_verification_test.py +++ b/tests/kafkatest/tests/tools/replica_verification_test.py @@ -16,8 +16,9 @@ from ducktape.utils.util import wait_until from ducktape.tests.test import Test -from kafkatest.services.verifiable_producer import VerifiableProducer +from ducktape.mark.resource import cluster +from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.replica_verification_tool import ReplicaVerificationTool @@ -59,9 +60,8 @@ class ReplicaVerificationToolTest(Test): def start_producer(self, max_messages, acks, timeout): # This will produce to kafka cluster + current_acked = 0 self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, acks=acks, max_messages=max_messages) - current_acked = self.producer.num_acked - self.logger.info("current_acked = %s" % current_acked) self.producer.start() wait_until(lambda: acks == 0 or self.producer.num_acked >= current_acked + max_messages, timeout_sec=timeout, err_msg="Timeout awaiting messages to be produced and acked") @@ -69,6 +69,7 @@ class ReplicaVerificationToolTest(Test): def stop_producer(self): self.producer.stop() + @cluster(num_nodes=6) def test_replica_lags(self, security_protocol='PLAINTEXT'): """ Tests ReplicaVerificationTool @@ -77,6 +78,7 @@ class ReplicaVerificationToolTest(Test): self.start_kafka(security_protocol, security_protocol) self.start_replica_verification_tool(security_protocol) self.start_producer(max_messages=10, acks=-1, timeout=15) + # Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) == 0, timeout_sec=10, err_msg="Timed out waiting to reach zero replica lags.") http://git-wip-us.apache.org/repos/asf/kafka/blob/62e043a8/tests/setup.py ---------------------------------------------------------------------- diff --git a/tests/setup.py b/tests/setup.py index cae0a3f..e43a4ab 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -50,7 +50,7 @@ setup(name="kafkatest", license="apache2.0", packages=find_packages(), include_package_data=True, - install_requires=["ducktape==0.5.3", "requests>=2.5.0"], + install_requires=["ducktape==0.6.0", "requests>=2.5.0"], tests_require=["pytest", "mock"], cmdclass={'test': PyTest}, )
