Repository: kafka
Updated Branches:
  refs/heads/trunk c36b5b7f6 -> 8c7e66313


KAFKA-5182: Reduce rebalance timeouts in request quota test

Reduce rebalance and session timeouts for join requests to trigger throttling 
in the request quota test.

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Damian Guy <damian....@gmail.com>

Closes #3057 from rajinisivaram/KAFKA-5182-quotatest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c7e6631
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c7e6631
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c7e6631

Branch: refs/heads/trunk
Commit: 8c7e6631308ae986bd60df0b0761f68c777fadff
Parents: c36b5b7
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Wed May 17 09:41:19 2017 -0400
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Wed May 17 09:41:19 2017 -0400

----------------------------------------------------------------------
 .../unit/kafka/server/RequestQuotaTest.scala    | 78 +++++++++-----------
 1 file changed, 33 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8c7e6631/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 4cf3e7d..1c496cd 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -57,13 +57,14 @@ class RequestQuotaTest extends BaseRequestTest {
     properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
     properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
     properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+    properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "100")
+    properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
     properties.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[RequestQuotaTest.TestAuthorizer].getName)
     properties.put(KafkaConfig.PrincipalBuilderClassProp, 
classOf[RequestQuotaTest.TestPrincipalBuilder].getName)
   }
 
   @Before
   override def setUp() {
-
     RequestQuotaTest.principal = KafkaPrincipal.ANONYMOUS
     super.setUp()
 
@@ -86,54 +87,41 @@ class RequestQuotaTest extends BaseRequestTest {
 
   @After
   override def tearDown() {
-    try {
-      executor.shutdownNow()
-    } finally {
-      super.tearDown()
-    }
+    try executor.shutdownNow()
+    finally super.tearDown()
   }
 
   @Test
   def testResponseThrottleTime() {
+    for (apiKey <- RequestQuotaTest.ClientActions)
+      submitTest(apiKey, () => checkRequestThrottleTime(apiKey))
 
-    for (apiKey <- RequestQuotaTest.ClientActions) {
-      val builder = requestBuilder(apiKey)
-      submitTest(apiKey, () => {
-        checkRequestThrottleTime(apiKey)
-      })
-    }
     waitAndCheckResults()
   }
 
   @Test
   def testUnthrottledClient() {
+    for (apiKey <- RequestQuotaTest.ClientActions)
+      submitTest(apiKey, () => checkUnthrottledClient(apiKey))
 
-    for (apiKey <- RequestQuotaTest.ClientActions) {
-      val builder = requestBuilder(apiKey)
-      submitTest(apiKey, () => {
-        checkUnthrottledClient(apiKey)
-      })
-    }
     waitAndCheckResults()
   }
 
   @Test
   def testExemptRequestTime() {
-
-    for (apiKey <- RequestQuotaTest.ClusterActions) {
+    for (apiKey <- RequestQuotaTest.ClusterActions)
       submitTest(apiKey, () => checkExemptRequestMetric(apiKey))
-    }
+
     waitAndCheckResults()
   }
 
   @Test
   def testUnauthorizedThrottle() {
-
     RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
 
-    for (apiKey <- ApiKeys.values) {
+    for (apiKey <- ApiKeys.values)
       submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
-    }
+
     waitAndCheckResults()
   }
 
@@ -171,19 +159,19 @@ class RequestQuotaTest extends BaseRequestTest {
   private def requestBuilder(apiKey: ApiKeys): AbstractRequest.Builder[_ <: 
AbstractRequest] = {
     apiKey match {
         case ApiKeys.PRODUCE =>
-          new requests.ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 
1, 5000,
+          new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000,
             collection.mutable.Map(tp -> 
MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord("test".getBytes))).asJava)
 
         case ApiKeys.FETCH =>
-          val partitionMap = new LinkedHashMap[TopicPartition, 
requests.FetchRequest.PartitionData]
-          partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 
100))
-          requests.FetchRequest.Builder.forConsumer(0, 0, partitionMap)
+          val partitionMap = new LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+          partitionMap.put(tp, new FetchRequest.PartitionData(0, 0, 100))
+          FetchRequest.Builder.forConsumer(0, 0, partitionMap)
 
         case ApiKeys.METADATA =>
-          new requests.MetadataRequest.Builder(List(topic).asJava)
+          new MetadataRequest.Builder(List(topic).asJava)
 
         case ApiKeys.LIST_OFFSETS =>
-          requests.ListOffsetRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED)
+          ListOffsetRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED)
             .setTargetTimes(Map(tp -> (0L: java.lang.Long)).asJava)
 
         case ApiKeys.LEADER_AND_ISR =>
@@ -197,29 +185,29 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.UPDATE_METADATA_KEY =>
           val partitionState = Map(tp -> new PartitionState(Int.MaxValue, 
brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
           val securityProtocol = SecurityProtocol.PLAINTEXT
-          val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
-            Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, 
securityProtocol,
+          val brokers = Set(new UpdateMetadataRequest.Broker(brokerId,
+            Seq(new UpdateMetadataRequest.EndPoint("localhost", 0, 
securityProtocol,
             ListenerName.forSecurityProtocol(securityProtocol))).asJava, 
null)).asJava
-          new 
requests.UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA_KEY.latestVersion,
 brokerId, Int.MaxValue, partitionState, brokers)
+          new 
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA_KEY.latestVersion, 
brokerId, Int.MaxValue, partitionState, brokers)
 
         case ApiKeys.CONTROLLED_SHUTDOWN_KEY =>
-          new requests.ControlledShutdownRequest.Builder(brokerId)
+          new ControlledShutdownRequest.Builder(brokerId)
 
         case ApiKeys.OFFSET_COMMIT =>
-          new requests.OffsetCommitRequest.Builder("test-group",
-            Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, 
"metadata")).asJava).
+          new OffsetCommitRequest.Builder("test-group",
+            Map(tp -> new OffsetCommitRequest.PartitionData(0, 
"metadata")).asJava).
             setMemberId("").setGenerationId(1).setRetentionTime(1000)
 
         case ApiKeys.OFFSET_FETCH =>
-          new requests.OffsetFetchRequest.Builder("test-group", 
List(tp).asJava)
+          new OffsetFetchRequest.Builder("test-group", List(tp).asJava)
 
         case ApiKeys.FIND_COORDINATOR =>
-          new 
requests.FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP,
 "test-group")
+          new 
FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, 
"test-group")
 
         case ApiKeys.JOIN_GROUP =>
-          new JoinGroupRequest.Builder("test-join-group", 10000, "", 
"consumer",
-            List( new 
JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
-           .setRebalanceTimeout(60000)
+          new JoinGroupRequest.Builder("test-join-group", 200, "", "consumer",
+            List(new JoinGroupRequest.ProtocolMetadata("consumer-range", 
ByteBuffer.wrap("test".getBytes()))).asJava)
+           .setRebalanceTimeout(100)
 
         case ApiKeys.HEARTBEAT =>
           new HeartbeatRequest.Builder("test-group", 1, "")
@@ -337,12 +325,12 @@ class RequestQuotaTest extends BaseRequestTest {
   private def responseThrottleTime(apiKey: ApiKeys, response: Struct): Int = {
     apiKey match {
       case ApiKeys.PRODUCE => new ProduceResponse(response).getThrottleTime
-      case ApiKeys.FETCH => new requests.FetchResponse(response).throttleTimeMs
+      case ApiKeys.FETCH => new FetchResponse(response).throttleTimeMs
       case ApiKeys.LIST_OFFSETS => new 
ListOffsetResponse(response).throttleTimeMs
       case ApiKeys.METADATA => new MetadataResponse(response).throttleTimeMs
-      case ApiKeys.OFFSET_COMMIT => new 
requests.OffsetCommitResponse(response).throttleTimeMs
-      case ApiKeys.OFFSET_FETCH => new 
requests.OffsetFetchResponse(response).throttleTimeMs
-      case ApiKeys.FIND_COORDINATOR => new 
requests.FindCoordinatorResponse(response).throttleTimeMs
+      case ApiKeys.OFFSET_COMMIT => new 
OffsetCommitResponse(response).throttleTimeMs
+      case ApiKeys.OFFSET_FETCH => new 
OffsetFetchResponse(response).throttleTimeMs
+      case ApiKeys.FIND_COORDINATOR => new 
FindCoordinatorResponse(response).throttleTimeMs
       case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs
       case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs
       case ApiKeys.LEAVE_GROUP => new 
LeaveGroupResponse(response).throttleTimeMs

Reply via email to