Repository: kafka Updated Branches: refs/heads/trunk 1d055f755 -> c0a62b70a
KAFKA-4055; System tests for secure quotas Fix existing client-id quota test which currently don't configure quota overrides correctly. Add new tests for user and (user, client-id) quota overrides and default quotas. Author: Rajini Sivaram <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #1860 from rajinisivaram/KAFKA-4055 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c0a62b70 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c0a62b70 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c0a62b70 Branch: refs/heads/trunk Commit: c0a62b70a8eadc550c937bb18e0203ab691618f5 Parents: 1d055f7 Author: Rajini Sivaram <[email protected]> Authored: Sun Sep 25 17:01:45 2016 -0700 Committer: Jun Rao <[email protected]> Committed: Sun Sep 25 17:01:45 2016 -0700 ---------------------------------------------------------------------- .../kafkatest/services/kafka/config_property.py | 2 - tests/kafkatest/services/kafka/kafka.py | 3 +- .../services/kafka/templates/kafka.properties | 16 --- tests/kafkatest/tests/client/quota_test.py | 118 +++++++++++++------ 4 files changed, 85 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c0a62b70/tests/kafkatest/services/kafka/config_property.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py index 217e970..e329151 100644 --- a/tests/kafkatest/services/kafka/config_property.py +++ b/tests/kafkatest/services/kafka/config_property.py @@ -144,8 +144,6 @@ From KafkaConfig.scala /** ********* Quota Configuration ***********/ val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default" val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default" - val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides" - val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides" val NumQuotaSamplesProp = "quota.window.num" val QuotaWindowSizeSecondsProp = "quota.window.size.seconds" http://git-wip-us.apache.org/repos/asf/kafka/blob/c0a62b70/tests/kafkatest/services/kafka/kafka.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 734eb5c..4ce86b8 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -67,7 +67,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, - authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, + authorizer_class_name=None, topics=None, version=TRUNK, jmx_object_names=None, jmx_attributes=[], zk_connect_timeout=5000, zk_session_timeout=6000): """ :type context @@ -78,7 +78,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes) self.zk = zk - self.quota_config = quota_config self.security_protocol = security_protocol self.interbroker_security_protocol = interbroker_security_protocol http://git-wip-us.apache.org/repos/asf/kafka/blob/c0a62b70/tests/kafkatest/services/kafka/templates/kafka.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index c02c64f..06ec603 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -33,22 +33,6 @@ log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false -{% if quota_config.quota_producer_default is defined and quota_config.quota_producer_default is not none %} -quota.producer.default={{ quota_config.quota_producer_default }} -{% endif %} - -{% if quota_config.quota_consumer_default is defined and quota_config.quota_consumer_default is not none %} -quota.consumer.default={{ quota_config.quota_consumer_default }} -{% endif %} - -{% if quota_config.quota_producer_bytes_per_second_overrides is defined and quota_config.quota_producer_bytes_per_second_overrides is not none %} -quota.producer.bytes.per.second.overrides={{ quota_config.quota_producer_bytes_per_second_overrides }} -{% endif %} - -{% if quota_config.quota_consumer_bytes_per_second_overrides is defined and quota_config.quota_consumer_bytes_per_second_overrides is not none %} -quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_per_second_overrides }} -{% endif %} - security.inter.broker.protocol={{ security_config.interbroker_security_protocol }} ssl.keystore.location=/mnt/security/test.keystore.jks http://git-wip-us.apache.org/repos/asf/kafka/blob/c0a62b70/tests/kafkatest/tests/client/quota_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index 7c2ec59..2abd089 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -14,13 +14,77 @@ # limitations under the License. from ducktape.tests.test import Test -from ducktape.mark import parametrize +from ducktape.mark import matrix, parametrize from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.performance import ProducerPerformanceService from kafkatest.services.console_consumer import ConsoleConsumer +class QuotaConfig(object): + CLIENT_ID = 'client-id' + USER = 'user' + USER_CLIENT = '(user, client-id)' + + LARGE_QUOTA = 1000 * 1000 * 1000 + USER_PRINCIPAL = 'CN=systemtest' + + def __init__(self, quota_type, override_quota, kafka): + if quota_type == QuotaConfig.CLIENT_ID: + if override_quota: + self.client_id = 'overridden_id' + self.producer_quota = 3750000 + self.consumer_quota = 3000000 + self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', self.client_id]) + self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None]) + else: + self.client_id = 'default_id' + self.producer_quota = 2500000 + self.consumer_quota = 2000000 + self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['clients', None]) + self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', 'overridden_id']) + elif quota_type == QuotaConfig.USER: + if override_quota: + self.client_id = 'some_id' + self.producer_quota = 3750000 + self.consumer_quota = 3000000 + self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL]) + self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None]) + self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id]) + else: + self.client_id = 'some_id' + self.producer_quota = 2500000 + self.consumer_quota = 2000000 + self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None]) + self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None]) + elif quota_type == QuotaConfig.USER_CLIENT: + if override_quota: + self.client_id = 'overridden_id' + self.producer_quota = 3750000 + self.consumer_quota = 3000000 + self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', self.client_id]) + self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', None]) + self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None]) + self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', self.client_id]) + else: + self.client_id = 'default_id' + self.producer_quota = 2500000 + self.consumer_quota = 2000000 + self.configure_quota(kafka, self.producer_quota, self.consumer_quota, ['users', None, 'clients', None]) + self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['users', None]) + self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, QuotaConfig.LARGE_QUOTA, ['clients', None]) + + def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args): + node = kafka.nodes[0] + cmd = "%s --zookeeper %s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \ + (kafka.path.script("kafka-configs.sh", node), kafka.zk.connect_setting(), producer_byte_rate, consumer_byte_rate) + cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1]) + if len(entity_args) > 2: + cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3]) + node.account.ssh(cmd) + + def entity_name_opt(self, name): + return " --entity-default" if name is None else " --entity-name " + name class QuotaTest(Test): """ @@ -36,22 +100,16 @@ class QuotaTest(Test): self.topic = 'test_topic' self.logger.info('use topic ' + self.topic) - # quota related parameters - self.quota_config = {'quota_producer_default': 2500000, - 'quota_consumer_default': 2000000, - 'quota_producer_bytes_per_second_overrides': 'overridden_id=3750000', - 'quota_consumer_bytes_per_second_overrides': 'overridden_id=3000000'} self.maximum_client_deviation_percentage = 100.0 self.maximum_broker_deviation_percentage = 5.0 - self.num_records = 100000 + self.num_records = 50000 self.record_size = 3000 self.zk = ZookeeperService(test_context, num_nodes=1) self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, - security_protocol='PLAINTEXT', - interbroker_security_protocol='PLAINTEXT', + security_protocol='SSL', authorizer_class_name='', + interbroker_security_protocol='SSL', topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}}, - quota_config=self.quota_config, jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec', 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'], jmx_attributes=['OneMinuteRate']) @@ -66,24 +124,27 @@ class QuotaTest(Test): """Override this since we're adding services outside of the constructor""" return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers - @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1) - @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1) - @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2) - def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1): + @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False]) + @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2) + def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1): + self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka) + producer_client_id = self.quota_config.client_id + consumer_client_id = self.quota_config.client_id + # Produce all messages producer = ProducerPerformanceService( self.test_context, producer_num, self.kafka, - topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_id, - jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id], jmx_attributes=['outgoing-byte-rate']) + topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id, + jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate']) producer.run() # Consume all messages consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic, - new_consumer=False, - consumer_timeout_ms=60000, client_id=consumer_id, - jmx_object_names=['kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s' % consumer_id], - jmx_attributes=['OneMinuteRate']) + new_consumer=True, + consumer_timeout_ms=60000, client_id=consumer_client_id, + jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id], + jmx_attributes=['bytes-consumed-rate']) consumer.run() for idx, messages in consumer.messages_consumed.iteritems(): @@ -118,7 +179,7 @@ class QuotaTest(Test): # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100) producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name] - producer_quota_bps = self.get_producer_quota(producer.client_id) + producer_quota_bps = self.quota_config.producer_quota self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps)) if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1): success = False @@ -136,9 +197,9 @@ class QuotaTest(Test): (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage) # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100) - consumer_attribute_name = 'kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s:OneMinuteRate' % consumer.client_id + consumer_attribute_name = 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate' % consumer.client_id consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name] - consumer_quota_bps = self.get_consumer_quota(consumer.client_id) + consumer_quota_bps = self.quota_config.consumer_quota self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps)) if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1): success = False @@ -157,14 +218,3 @@ class QuotaTest(Test): return success, msg - def get_producer_quota(self, client_id): - overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_producer_bytes_per_second_overrides'].split(',')} - if client_id in overridden_quotas: - return float(overridden_quotas[client_id]) - return self.quota_config['quota_producer_default'] - - def get_consumer_quota(self, client_id): - overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_consumer_bytes_per_second_overrides'].split(',')} - if client_id in overridden_quotas: - return float(overridden_quotas[client_id]) - return self.quota_config['quota_consumer_default']
