Repository: kafka
Updated Branches:
  refs/heads/trunk 1d055f755 -> c0a62b70a


KAFKA-4055; System tests for secure quotas

Fix existing client-id quota test which currently don't configure quota 
overrides correctly. Add new tests for user and (user, client-id) quota 
overrides and default quotas.

Author: Rajini Sivaram <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes #1860 from rajinisivaram/KAFKA-4055


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c0a62b70
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c0a62b70
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c0a62b70

Branch: refs/heads/trunk
Commit: c0a62b70a8eadc550c937bb18e0203ab691618f5
Parents: 1d055f7
Author: Rajini Sivaram <[email protected]>
Authored: Sun Sep 25 17:01:45 2016 -0700
Committer: Jun Rao <[email protected]>
Committed: Sun Sep 25 17:01:45 2016 -0700

----------------------------------------------------------------------
 .../kafkatest/services/kafka/config_property.py |   2 -
 tests/kafkatest/services/kafka/kafka.py         |   3 +-
 .../services/kafka/templates/kafka.properties   |  16 ---
 tests/kafkatest/tests/client/quota_test.py      | 118 +++++++++++++------
 4 files changed, 85 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c0a62b70/tests/kafkatest/services/kafka/config_property.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config_property.py 
b/tests/kafkatest/services/kafka/config_property.py
index 217e970..e329151 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -144,8 +144,6 @@ From KafkaConfig.scala
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
   val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
-  val ProducerQuotaBytesPerSecondOverridesProp = 
"quota.producer.bytes.per.second.overrides"
-  val ConsumerQuotaBytesPerSecondOverridesProp = 
"quota.consumer.bytes.per.second.overrides"
   val NumQuotaSamplesProp = "quota.window.num"
   val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c0a62b70/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 734eb5c..4ce86b8 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -67,7 +67,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
     def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAINTEXT, 
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, 
interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
-                 authorizer_class_name=None, topics=None, version=TRUNK, 
quota_config=None, jmx_object_names=None,
+                 authorizer_class_name=None, topics=None, version=TRUNK, 
jmx_object_names=None,
                  jmx_attributes=[], zk_connect_timeout=5000, 
zk_session_timeout=6000):
         """
         :type context
@@ -78,7 +78,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
 
         self.zk = zk
-        self.quota_config = quota_config
 
         self.security_protocol = security_protocol
         self.interbroker_security_protocol = interbroker_security_protocol

http://git-wip-us.apache.org/repos/asf/kafka/blob/c0a62b70/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties 
b/tests/kafkatest/services/kafka/templates/kafka.properties
index c02c64f..06ec603 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -33,22 +33,6 @@ log.segment.bytes=1073741824
 log.retention.check.interval.ms=300000
 log.cleaner.enable=false
 
-{% if quota_config.quota_producer_default is defined and 
quota_config.quota_producer_default is not none %}
-quota.producer.default={{ quota_config.quota_producer_default }}
-{% endif %}
-
-{% if quota_config.quota_consumer_default is defined and 
quota_config.quota_consumer_default is not none %}
-quota.consumer.default={{ quota_config.quota_consumer_default }}
-{% endif %}
-
-{% if quota_config.quota_producer_bytes_per_second_overrides is defined and 
quota_config.quota_producer_bytes_per_second_overrides is not none %}
-quota.producer.bytes.per.second.overrides={{ 
quota_config.quota_producer_bytes_per_second_overrides }}
-{% endif %}
-
-{% if quota_config.quota_consumer_bytes_per_second_overrides is defined and 
quota_config.quota_consumer_bytes_per_second_overrides is not none %}
-quota.consumer.bytes.per.second.overrides={{ 
quota_config.quota_consumer_bytes_per_second_overrides }}
-{% endif %}
-
 security.inter.broker.protocol={{ 
security_config.interbroker_security_protocol }}
 
 ssl.keystore.location=/mnt/security/test.keystore.jks

http://git-wip-us.apache.org/repos/asf/kafka/blob/c0a62b70/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py 
b/tests/kafkatest/tests/client/quota_test.py
index 7c2ec59..2abd089 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -14,13 +14,77 @@
 # limitations under the License.
 
 from ducktape.tests.test import Test
-from ducktape.mark import parametrize
+from ducktape.mark import matrix, parametrize
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.performance import ProducerPerformanceService
 from kafkatest.services.console_consumer import ConsoleConsumer
 
+class QuotaConfig(object):
+    CLIENT_ID = 'client-id'
+    USER = 'user'
+    USER_CLIENT = '(user, client-id)'
+
+    LARGE_QUOTA = 1000 * 1000 * 1000
+    USER_PRINCIPAL = 'CN=systemtest'
+
+    def __init__(self, quota_type, override_quota, kafka):
+        if quota_type == QuotaConfig.CLIENT_ID:
+            if override_quota:
+                self.client_id = 'overridden_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, 
self.consumer_quota, ['clients', self.client_id])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, 
QuotaConfig.LARGE_QUOTA, ['clients', None])
+            else:
+                self.client_id = 'default_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, 
self.consumer_quota, ['clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, 
QuotaConfig.LARGE_QUOTA, ['clients', 'overridden_id'])
+        elif quota_type == QuotaConfig.USER:
+            if override_quota:
+                self.client_id = 'some_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, 
self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, 
QuotaConfig.LARGE_QUOTA, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, 
QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
+            else:
+                self.client_id = 'some_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, 
self.consumer_quota, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, 
QuotaConfig.LARGE_QUOTA, ['clients', None])
+        elif quota_type == QuotaConfig.USER_CLIENT:
+            if override_quota:
+                self.client_id = 'overridden_id'
+                self.producer_quota = 3750000
+                self.consumer_quota = 3000000
+                self.configure_quota(kafka, self.producer_quota, 
self.consumer_quota, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', 
self.client_id])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, 
QuotaConfig.LARGE_QUOTA, ['users', QuotaConfig.USER_PRINCIPAL, 'clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, 
QuotaConfig.LARGE_QUOTA, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, 
QuotaConfig.LARGE_QUOTA, ['clients', self.client_id])
+            else:
+                self.client_id = 'default_id'
+                self.producer_quota = 2500000
+                self.consumer_quota = 2000000
+                self.configure_quota(kafka, self.producer_quota, 
self.consumer_quota, ['users', None, 'clients', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, 
QuotaConfig.LARGE_QUOTA, ['users', None])
+                self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, 
QuotaConfig.LARGE_QUOTA, ['clients', None])
+
+    def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, 
entity_args):
+        node = kafka.nodes[0]
+        cmd = "%s --zookeeper %s --alter --add-config 
producer_byte_rate=%d,consumer_byte_rate=%d" % \
+              (kafka.path.script("kafka-configs.sh", node), 
kafka.zk.connect_setting(), producer_byte_rate, consumer_byte_rate)
+        cmd += " --entity-type " + entity_args[0] + 
self.entity_name_opt(entity_args[1])
+        if len(entity_args) > 2:
+            cmd += " --entity-type " + entity_args[2] + 
self.entity_name_opt(entity_args[3])
+        node.account.ssh(cmd)
+
+    def entity_name_opt(self, name):
+        return " --entity-default" if name is None else " --entity-name " + 
name
 
 class QuotaTest(Test):
     """
@@ -36,22 +100,16 @@ class QuotaTest(Test):
         self.topic = 'test_topic'
         self.logger.info('use topic ' + self.topic)
 
-        # quota related parameters
-        self.quota_config = {'quota_producer_default': 2500000,
-                             'quota_consumer_default': 2000000,
-                             'quota_producer_bytes_per_second_overrides': 
'overridden_id=3750000',
-                             'quota_consumer_bytes_per_second_overrides': 
'overridden_id=3000000'}
         self.maximum_client_deviation_percentage = 100.0
         self.maximum_broker_deviation_percentage = 5.0
-        self.num_records = 100000
+        self.num_records = 50000
         self.record_size = 3000
 
         self.zk = ZookeeperService(test_context, num_nodes=1)
         self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
-                                  security_protocol='PLAINTEXT',
-                                  interbroker_security_protocol='PLAINTEXT',
+                                  security_protocol='SSL', 
authorizer_class_name='',
+                                  interbroker_security_protocol='SSL',
                                   topics={self.topic: {'partitions': 6, 
'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
-                                  quota_config=self.quota_config,
                                   
jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
                                                     
'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
                                   jmx_attributes=['OneMinuteRate'])
@@ -66,24 +124,27 @@ class QuotaTest(Test):
         """Override this since we're adding services outside of the 
constructor"""
         return super(QuotaTest, self).min_cluster_size() + self.num_producers 
+ self.num_consumers
 
-    @parametrize(producer_id='default_id', producer_num=1, 
consumer_id='default_id', consumer_num=1)
-    @parametrize(producer_id='overridden_id', producer_num=1, 
consumer_id='overridden_id', consumer_num=1)
-    @parametrize(producer_id='overridden_id', producer_num=1, 
consumer_id='overridden_id', consumer_num=2)
-    def test_quota(self, producer_id='default_id', producer_num=1, 
consumer_id='default_id', consumer_num=1):
+    @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, 
QuotaConfig.USER_CLIENT], override_quota=[True, False])
+    @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
+    def test_quota(self, quota_type, override_quota=True, producer_num=1, 
consumer_num=1):
+        self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
+        producer_client_id = self.quota_config.client_id
+        consumer_client_id = self.quota_config.client_id
+
         # Produce all messages
         producer = ProducerPerformanceService(
             self.test_context, producer_num, self.kafka,
-            topic=self.topic, num_records=self.num_records, 
record_size=self.record_size, throughput=-1, client_id=producer_id,
-            
jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % 
producer_id], jmx_attributes=['outgoing-byte-rate'])
+            topic=self.topic, num_records=self.num_records, 
record_size=self.record_size, throughput=-1, client_id=producer_client_id,
+            
jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % 
producer_client_id], jmx_attributes=['outgoing-byte-rate'])
 
         producer.run()
 
         # Consume all messages
         consumer = ConsoleConsumer(self.test_context, consumer_num, 
self.kafka, self.topic,
-            new_consumer=False,
-            consumer_timeout_ms=60000, client_id=consumer_id,
-            
jmx_object_names=['kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s'
 % consumer_id],
-            jmx_attributes=['OneMinuteRate'])
+            new_consumer=True,
+            consumer_timeout_ms=60000, client_id=consumer_client_id,
+            
jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s'
 % consumer_client_id],
+            jmx_attributes=['bytes-consumed-rate'])
         consumer.run()
 
         for idx, messages in consumer.messages_consumed.iteritems():
@@ -118,7 +179,7 @@ class QuotaTest(Test):
         # validate that maximum_producer_throughput <= producer_quota * (1 + 
maximum_client_deviation_percentage/100)
         producer_attribute_name = 
'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % 
producer.client_id
         producer_maximum_bps = 
producer.maximum_jmx_value[producer_attribute_name]
-        producer_quota_bps = self.get_producer_quota(producer.client_id)
+        producer_quota_bps = self.quota_config.producer_quota
         self.logger.info('producer has maximum throughput %.2f bps with 
producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
         if producer_maximum_bps > 
producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
             success = False
@@ -136,9 +197,9 @@ class QuotaTest(Test):
                    (broker_maximum_byte_in_bps, producer_quota_bps, 
self.maximum_broker_deviation_percentage)
 
         # validate that maximum_consumer_throughput <= consumer_quota * (1 + 
maximum_client_deviation_percentage/100)
-        consumer_attribute_name = 
'kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s:OneMinuteRate'
 % consumer.client_id
+        consumer_attribute_name = 
'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s:bytes-consumed-rate'
 % consumer.client_id
         consumer_maximum_bps = 
consumer.maximum_jmx_value[consumer_attribute_name]
-        consumer_quota_bps = self.get_consumer_quota(consumer.client_id)
+        consumer_quota_bps = self.quota_config.consumer_quota
         self.logger.info('consumer has maximum throughput %.2f bps with 
consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
         if consumer_maximum_bps > 
consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
             success = False
@@ -157,14 +218,3 @@ class QuotaTest(Test):
 
         return success, msg
 
-    def get_producer_quota(self, client_id):
-        overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value 
in self.quota_config['quota_producer_bytes_per_second_overrides'].split(',')}
-        if client_id in overridden_quotas:
-            return float(overridden_quotas[client_id])
-        return self.quota_config['quota_producer_default']
-
-    def get_consumer_quota(self, client_id):
-        overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value 
in self.quota_config['quota_consumer_bytes_per_second_overrides'].split(',')}
-        if client_id in overridden_quotas:
-            return float(overridden_quotas[client_id])
-        return self.quota_config['quota_consumer_default']

Reply via email to