This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 65a8061 KAFKA-6944; Add system tests testing the new throttling
behavior using older clients/brokers
65a8061 is described below
commit 65a8061f9f6af73090e98ccf1f2d8abf70cc70dc
Author: Jon Lee <[email protected]>
AuthorDate: Wed Jun 27 16:49:12 2018 -0700
KAFKA-6944; Add system tests testing the new throttling behavior using
older clients/brokers
Added two additional test cases to quota_test.py, which run between brokers
and clients with different throttling behaviors. More specifically,
1. clients with new throttling behavior (i.e., post-KIP-219) and brokers
with old throttling behavior (i.e., pre-KIP-219)
2. clients with old throttling behavior and brokers with new throttling
behavior
Author: Jon Lee <[email protected]>
Author: Dong Lin <[email protected]>
Reviewers: Dong Lin <[email protected]>
Closes #5294 from jonlee2/kafka-6944
---
tests/kafkatest/tests/client/quota_test.py | 23 +++++++++++++++++++----
1 file changed, 19 insertions(+), 4 deletions(-)
diff --git a/tests/kafkatest/tests/client/quota_test.py
b/tests/kafkatest/tests/client/quota_test.py
index 47a6a96..c084e08 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -21,6 +21,7 @@ from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.performance import ProducerPerformanceService
from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.version import DEV_BRANCH, V_1_1_0
class QuotaConfig(object):
CLIENT_ID = 'client-id'
@@ -119,7 +120,6 @@ class QuotaTest(Test):
def setUp(self):
self.zk.start()
- self.kafka.start()
def min_cluster_size(self):
"""Override this since we're adding services outside of the
constructor"""
@@ -128,15 +128,30 @@ class QuotaTest(Test):
@cluster(num_nodes=5)
@matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER,
QuotaConfig.USER_CLIENT], override_quota=[True, False])
@parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
- def test_quota(self, quota_type, override_quota=True, producer_num=1,
consumer_num=1):
+ @parametrize(quota_type=QuotaConfig.CLIENT_ID,
old_broker_throttling_behavior=True)
+ @parametrize(quota_type=QuotaConfig.CLIENT_ID,
old_client_throttling_behavior=True)
+ def test_quota(self, quota_type, override_quota=True, producer_num=1,
consumer_num=1,
+ old_broker_throttling_behavior=False,
old_client_throttling_behavior=False):
+ # Old (pre-2.0) throttling behavior for broker throttles before
sending a response to the client.
+ if old_broker_throttling_behavior:
+ self.kafka.set_version(V_1_1_0)
+ self.kafka.start()
+
self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
producer_client_id = self.quota_config.client_id
consumer_client_id = self.quota_config.client_id
+ # Old (pre-2.0) throttling behavior for client does not throttle upon
receiving a response with a non-zero throttle time.
+ if old_client_throttling_behavior:
+ client_version = V_1_1_0
+ else:
+ client_version = DEV_BRANCH
+
# Produce all messages
producer = ProducerPerformanceService(
self.test_context, producer_num, self.kafka,
- topic=self.topic, num_records=self.num_records,
record_size=self.record_size, throughput=-1, client_id=producer_client_id)
+ topic=self.topic, num_records=self.num_records,
record_size=self.record_size, throughput=-1,
+ client_id=producer_client_id, version=client_version)
producer.run()
@@ -144,7 +159,7 @@ class QuotaTest(Test):
consumer = ConsoleConsumer(self.test_context, consumer_num,
self.kafka, self.topic,
consumer_timeout_ms=60000, client_id=consumer_client_id,
jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s'
% consumer_client_id],
- jmx_attributes=['bytes-consumed-rate'])
+ jmx_attributes=['bytes-consumed-rate'], version=client_version)
consumer.run()
for idx, messages in consumer.messages_consumed.iteritems():