Repository: kafka Updated Branches: refs/heads/trunk c36b5b7f6 -> 8c7e66313
KAFKA-5182: Reduce rebalance timeouts in request quota test Reduce rebalance and session timeouts for join requests to trigger throttling in the request quota test. Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Damian Guy <damian....@gmail.com> Closes #3057 from rajinisivaram/KAFKA-5182-quotatest Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c7e6631 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c7e6631 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c7e6631 Branch: refs/heads/trunk Commit: 8c7e6631308ae986bd60df0b0761f68c777fadff Parents: c36b5b7 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Wed May 17 09:41:19 2017 -0400 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Wed May 17 09:41:19 2017 -0400 ---------------------------------------------------------------------- .../unit/kafka/server/RequestQuotaTest.scala | 78 +++++++++----------- 1 file changed, 33 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8c7e6631/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 4cf3e7d..1c496cd 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -57,13 +57,14 @@ class RequestQuotaTest extends BaseRequestTest { properties.put(KafkaConfig.ControlledShutdownEnableProp, "false") properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") + properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") + properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.TestAuthorizer].getName) properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[RequestQuotaTest.TestPrincipalBuilder].getName) } @Before override def setUp() { - RequestQuotaTest.principal = KafkaPrincipal.ANONYMOUS super.setUp() @@ -86,54 +87,41 @@ class RequestQuotaTest extends BaseRequestTest { @After override def tearDown() { - try { - executor.shutdownNow() - } finally { - super.tearDown() - } + try executor.shutdownNow() + finally super.tearDown() } @Test def testResponseThrottleTime() { + for (apiKey <- RequestQuotaTest.ClientActions) + submitTest(apiKey, () => checkRequestThrottleTime(apiKey)) - for (apiKey <- RequestQuotaTest.ClientActions) { - val builder = requestBuilder(apiKey) - submitTest(apiKey, () => { - checkRequestThrottleTime(apiKey) - }) - } waitAndCheckResults() } @Test def testUnthrottledClient() { + for (apiKey <- RequestQuotaTest.ClientActions) + submitTest(apiKey, () => checkUnthrottledClient(apiKey)) - for (apiKey <- RequestQuotaTest.ClientActions) { - val builder = requestBuilder(apiKey) - submitTest(apiKey, () => { - checkUnthrottledClient(apiKey) - }) - } waitAndCheckResults() } @Test def testExemptRequestTime() { - - for (apiKey <- RequestQuotaTest.ClusterActions) { + for (apiKey <- RequestQuotaTest.ClusterActions) submitTest(apiKey, () => checkExemptRequestMetric(apiKey)) - } + waitAndCheckResults() } @Test def testUnauthorizedThrottle() { - RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal - for (apiKey <- ApiKeys.values) { + for (apiKey <- ApiKeys.values) submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey)) - } + waitAndCheckResults() } @@ -171,19 +159,19 @@ class RequestQuotaTest extends BaseRequestTest { private def requestBuilder(apiKey: ApiKeys): AbstractRequest.Builder[_ <: AbstractRequest] = { apiKey match { case ApiKeys.PRODUCE => - new requests.ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000, + new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000, collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava) case ApiKeys.FETCH => - val partitionMap = new LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] - partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100)) - requests.FetchRequest.Builder.forConsumer(0, 0, partitionMap) + val partitionMap = new LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + partitionMap.put(tp, new FetchRequest.PartitionData(0, 0, 100)) + FetchRequest.Builder.forConsumer(0, 0, partitionMap) case ApiKeys.METADATA => - new requests.MetadataRequest.Builder(List(topic).asJava) + new MetadataRequest.Builder(List(topic).asJava) case ApiKeys.LIST_OFFSETS => - requests.ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) .setTargetTimes(Map(tp -> (0L: java.lang.Long)).asJava) case ApiKeys.LEADER_AND_ISR => @@ -197,29 +185,29 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.UPDATE_METADATA_KEY => val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava val securityProtocol = SecurityProtocol.PLAINTEXT - val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId, - Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol, + val brokers = Set(new UpdateMetadataRequest.Broker(brokerId, + Seq(new UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol, ListenerName.forSecurityProtocol(securityProtocol))).asJava, null)).asJava - new requests.UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA_KEY.latestVersion, brokerId, Int.MaxValue, partitionState, brokers) + new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA_KEY.latestVersion, brokerId, Int.MaxValue, partitionState, brokers) case ApiKeys.CONTROLLED_SHUTDOWN_KEY => - new requests.ControlledShutdownRequest.Builder(brokerId) + new ControlledShutdownRequest.Builder(brokerId) case ApiKeys.OFFSET_COMMIT => - new requests.OffsetCommitRequest.Builder("test-group", - Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava). + new OffsetCommitRequest.Builder("test-group", + Map(tp -> new OffsetCommitRequest.PartitionData(0, "metadata")).asJava). setMemberId("").setGenerationId(1).setRetentionTime(1000) case ApiKeys.OFFSET_FETCH => - new requests.OffsetFetchRequest.Builder("test-group", List(tp).asJava) + new OffsetFetchRequest.Builder("test-group", List(tp).asJava) case ApiKeys.FIND_COORDINATOR => - new requests.FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group") + new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group") case ApiKeys.JOIN_GROUP => - new JoinGroupRequest.Builder("test-join-group", 10000, "", "consumer", - List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava) - .setRebalanceTimeout(60000) + new JoinGroupRequest.Builder("test-join-group", 200, "", "consumer", + List(new JoinGroupRequest.ProtocolMetadata("consumer-range", ByteBuffer.wrap("test".getBytes()))).asJava) + .setRebalanceTimeout(100) case ApiKeys.HEARTBEAT => new HeartbeatRequest.Builder("test-group", 1, "") @@ -337,12 +325,12 @@ class RequestQuotaTest extends BaseRequestTest { private def responseThrottleTime(apiKey: ApiKeys, response: Struct): Int = { apiKey match { case ApiKeys.PRODUCE => new ProduceResponse(response).getThrottleTime - case ApiKeys.FETCH => new requests.FetchResponse(response).throttleTimeMs + case ApiKeys.FETCH => new FetchResponse(response).throttleTimeMs case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs case ApiKeys.METADATA => new MetadataResponse(response).throttleTimeMs - case ApiKeys.OFFSET_COMMIT => new requests.OffsetCommitResponse(response).throttleTimeMs - case ApiKeys.OFFSET_FETCH => new requests.OffsetFetchResponse(response).throttleTimeMs - case ApiKeys.FIND_COORDINATOR => new requests.FindCoordinatorResponse(response).throttleTimeMs + case ApiKeys.OFFSET_COMMIT => new OffsetCommitResponse(response).throttleTimeMs + case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs + case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs