Repository: kafka Updated Branches: refs/heads/trunk 249152062 -> 9815e18fe
http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c3c37c1..bf7d4c1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -43,15 +43,20 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol} -import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch} +import org.apache.kafka.common.record._ +import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse +import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse} import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.requests.SaslHandshakeResponse +import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.clients.admin.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType, Resource => AdminResource, ResourceType => AdminResourceType} import scala.collection._ import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import scala.util.{Failure, Success, Try} /** @@ -118,6 +123,9 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.END_TXN => handleEndTxnRequest(request) case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request) case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request) + case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request) + case ApiKeys.CREATE_ACLS => handleCreateAcls(request) + case ApiKeys.DELETE_ACLS => handleDeleteAcls(request) } } catch { case e: FatalExitError => throw e @@ -1655,6 +1663,217 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleDescribeAcls(request: RequestChannel.Request): Unit = { + authorizeClusterAction(request) + val describeAclsRequest = request.body[DescribeAclsRequest] + authorizer match { + case None => + def createResponse(throttleTimeMs: Int): AbstractResponse = + new DescribeAclsResponse(throttleTimeMs, new SecurityDisabledException( + "No Authorizer is configured on the broker."), Collections.emptySet[AclBinding]); + sendResponseMaybeThrottle(request, createResponse) + case Some(auth) => + val filter = describeAclsRequest.filter() + var returnedAcls = new util.ArrayList[AclBinding] + val aclMap : Map[Resource, Set[Acl]] = auth.getAcls() + aclMap.foreach { + case (resource, acls) => { + acls.foreach { + case (acl) => { + val fixture = new AclBinding(new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name), + new AccessControlEntry(acl.principal.toString(), acl.host.toString(), acl.operation.toJava, acl.permissionType.toJava)) + if (filter.matches(fixture)) + returnedAcls.add(fixture) + } + } + } + } + def createResponse(throttleTimeMs: Int): AbstractResponse = + new DescribeAclsResponse(throttleTimeMs, null, returnedAcls) + sendResponseMaybeThrottle(request, createResponse) + } + } + + /** + * Convert an ACL binding filter to a Scala object. + * All ACL and resource fields must be specified (no UNKNOWN, ANY, or null fields are allowed.) + * + * @param filter The binding filter as a Java object. + * @return The binding filter as a scala object, or an exception if there was an error + * converting the Java object. + */ + def toScala(filter: AclBindingFilter) : Try[(Resource, Acl)] = { + filter.resourceFilter().resourceType() match { + case AdminResourceType.UNKNOWN => return Failure(new InvalidRequestException("Invalid UNKNOWN resource type")) + case AdminResourceType.ANY => return Failure(new InvalidRequestException("Invalid ANY resource type")) + case _ => {} + } + var resourceType: ResourceType = null + try { + resourceType = ResourceType.fromString(filter.resourceFilter().resourceType().toString) + } catch { + case throwable: Throwable => return Failure(new InvalidRequestException("Invalid resource type")) + } + var principal: KafkaPrincipal = null + try { + principal = KafkaPrincipal.fromString(filter.entryFilter().principal()) + } catch { + case throwable: Throwable => return Failure(new InvalidRequestException("Invalid principal")) + } + filter.entryFilter().operation() match { + case AclOperation.UNKNOWN => return Failure(new InvalidRequestException("Invalid UNKNOWN operation type")) + case AclOperation.ANY => return Failure(new InvalidRequestException("Invalid ANY operation type")) + case _ => {} + } + val operation = Operation.fromJava(filter.entryFilter().operation()) match { + case Failure(throwable) => return Failure(new InvalidRequestException(throwable.getMessage)) + case Success(op) => op + } + filter.entryFilter().permissionType() match { + case AclPermissionType.UNKNOWN => new InvalidRequestException("Invalid UNKNOWN permission type") + case AclPermissionType.ANY => new InvalidRequestException("Invalid ANY permission type") + case _ => {} + } + val permissionType = PermissionType.fromJava(filter.entryFilter.permissionType) match { + case Failure(throwable) => return Failure(new InvalidRequestException(throwable.getMessage)) + case Success(perm) => perm + } + return Success((Resource(resourceType, filter.resourceFilter().name()), Acl(principal, permissionType, + filter.entryFilter().host(), operation))) + } + + /** + * Convert a Scala ACL binding to a Java object. + * + * @param acl The binding as a Scala object. + * @return The binding as a Java object. + */ + def toJava(acl: (Resource, Acl)) : AclBinding = { + acl match { + case (resource, acl) => + val adminResource = new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name) + val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString, + acl.operation.toJava, acl.permissionType.toJava) + return new AclBinding(adminResource, entry) + } + } + + def handleCreateAcls(request: RequestChannel.Request): Unit = { + authorizeClusterAction(request) + val createAclsRequest = request.body[CreateAclsRequest] + authorizer match { + case None => + def createResponse(throttleTimeMs: Int): AbstractResponse = + createAclsRequest.getErrorResponse(throttleTimeMs, + new SecurityDisabledException("No Authorizer is configured on the broker.")) + sendResponseMaybeThrottle(request, createResponse) + case Some(auth) => + val errors = mutable.HashMap[Int, Throwable]() + var creations = ListBuffer[(Resource, Acl)]() + for (i <- 0 to createAclsRequest.aclCreations().size() - 1) { + val result = toScala(createAclsRequest.aclCreations.get(i).acl.toFilter) + result match { + case Failure(throwable) => errors.put(i, throwable) + case Success((resource, acl)) => try { + if (resource.resourceType.equals(Cluster) && + !resource.name.equals(Resource.ClusterResourceName)) + throw new InvalidRequestException("The only valid name for the CLUSTER resource is " + + Resource.ClusterResourceName) + if (resource.name.isEmpty()) + throw new InvalidRequestException("Invalid empty resource name") + auth.addAcls(immutable.Set(acl), resource) + } catch { + case throwable : Throwable => errors.put(i, throwable) + } + } + } + var aclCreationResults = new java.util.ArrayList[AclCreationResponse] + for (i <- 0 to createAclsRequest.aclCreations().size() - 1) { + errors.get(i) match { + case Some(throwable) => aclCreationResults.add(new AclCreationResponse(throwable)) + case None => aclCreationResults.add(new AclCreationResponse(null)) + } + } + def createResponse(throttleTimeMs: Int): AbstractResponse = + new CreateAclsResponse(throttleTimeMs, aclCreationResults) + sendResponseMaybeThrottle(request, createResponse) + } + } + + def handleDeleteAcls(request: RequestChannel.Request): Unit = { + authorizeClusterAction(request) + val deleteAclsRequest = request.body[DeleteAclsRequest] + authorizer match { + case None => + def createResponse(throttleTimeMs: Int): AbstractResponse = + deleteAclsRequest.getErrorResponse(throttleTimeMs, + new SecurityDisabledException("No Authorizer is configured on the broker.")) + sendResponseMaybeThrottle(request, createResponse) + case Some(auth) => + val filterResponseMap = mutable.HashMap[Int, AclFilterResponse]() + var toDelete = mutable.HashMap[Int, ListBuffer[(Resource, Acl)]]() + for (i <- 0 to deleteAclsRequest.filters().size - 1) { + toDelete.put(i, new ListBuffer[(Resource, Acl)]()) + } + if (deleteAclsRequest.filters().asScala.exists { f => !f.matchesAtMostOne() }) { + // Delete based on filters that may match more than one ACL. + val aclMap : Map[Resource, Set[Acl]] = auth.getAcls() + aclMap.foreach { + case (resource, acls) => { + acls.foreach { + case (acl) => { + val binding = new AclBinding(new AdminResource(AdminResourceType. + fromString(resource.resourceType.toString), resource.name), + new AccessControlEntry(acl.principal.toString(), acl.host.toString(), + acl.operation.toJava, acl.permissionType.toJava)) + for (i <- 0 to deleteAclsRequest.filters().size - 1) { + val filter = deleteAclsRequest.filters().get(i) + if (filter.matches(binding)) { + toDelete.get(i).get += ((resource, acl)) + } + } + } + } + } + } + } else { + // Delete based on a list of ACL fixtures. + for (i <- 0 to deleteAclsRequest.filters().size - 1) { + toScala(deleteAclsRequest.filters().get(i)) match { + case Failure(throwable) => filterResponseMap.put(i, + new AclFilterResponse(throwable, Collections.emptySet[AclDeletionResult]())) + case Success(fixture) => toDelete.put(i, ListBuffer(fixture)) + } + } + } + for (i <- toDelete.keys) { + val deletionResults = new util.ArrayList[AclDeletionResult]() + for (acls <- toDelete.get(i)) { + for ((resource, acl) <- acls) { + try { + if (auth.removeAcls(immutable.Set(acl), resource)) { + deletionResults.add(new AclDeletionResult(null, toJava((resource, acl)))) + } + } catch { + case throwable: Throwable => deletionResults.add(new AclDeletionResult( + new UnknownServerException("Failed to delete ACL: " + throwable.toString), + toJava((resource, acl)))) + } + } + } + filterResponseMap.put(i, new AclFilterResponse(null, deletionResults)) + } + val filterResponses = new util.ArrayList[AclFilterResponse] + for (i <- 0 to deleteAclsRequest.filters().size() - 1) { + filterResponses.add(filterResponseMap.getOrElse(i, + new AclFilterResponse(null, new util.ArrayList[AclDeletionResult]()))) + } + def createResponse(throttleTimeMs: Int): AbstractResponse = + new DeleteAclsResponse(throttleTimeMs, filterResponses) + sendResponseMaybeThrottle(request, createResponse) + } + } + def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = { val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest] val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition() http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala index ed513ea..f381b15 100644 --- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala @@ -17,7 +17,7 @@ package kafka.api import java.util -import java.util.Properties +import java.util.{Collections, Properties} import java.util.concurrent.ExecutionException import org.apache.kafka.common.utils.Utils @@ -27,7 +27,7 @@ import org.apache.kafka.clients.admin._ import kafka.utils.{Logging, TestUtils} import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.common.KafkaFuture -import org.apache.kafka.common.errors.TopicExistsException +import org.apache.kafka.common.errors.{SecurityDisabledException, TopicExistsException} import org.apache.kafka.common.protocol.ApiKeys import org.junit.{After, Before, Rule, Test} import org.apache.kafka.common.requests.MetadataResponse @@ -156,6 +156,25 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin client.close() } + val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)); + + /** + * Test that ACL operations are not possible when the authorizer is disabled. + * Also see {@link kafka.api.SaslSslAdminClientIntegrationTest} for tests of ACL operations + * when the authorizer is enabled. + */ + @Test + def testAclOperations(): Unit = { + client = AdminClient.create(createConfig()) + assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).all(), classOf[SecurityDisabledException]) + assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(ACL1)).all(), + classOf[SecurityDisabledException]) + assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(), + classOf[SecurityDisabledException]) + client.close() + } + override def generateConfigs() = { val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index 1bfcdf2..cb43b09 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -14,13 +14,22 @@ package kafka.api import java.io.File +import kafka.security.auth.SimpleAclAuthorizer import org.apache.kafka.common.protocol.SecurityProtocol import kafka.server.KafkaConfig -import kafka.utils.JaasTestUtils -import org.junit.{After, Before} +import kafka.utils.{JaasTestUtils, TestUtils} +import org.apache.kafka.clients.admin.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType, AdminClient, CreateAclsOptions, DeleteAclsOptions, Resource, ResourceFilter, ResourceType} +import org.apache.kafka.common.errors.InvalidRequestException +import org.junit.Assert.assertEquals +import org.junit.{After, Assert, Before, Test} + +import scala.collection.JavaConverters._ class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest with SaslSetup { this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName()) + this.serverConfig.setProperty(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true") + override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) @@ -35,5 +44,77 @@ class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest super.tearDown() closeSasl() } - + + val ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic2"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW)); + val ACL3 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)); + val ACL_UNKNOWN = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW)); + + /** + * Test that ACL operations are not possible when the authorizer is disabled. + * Also see {@link kafka.api.KafkaAdminClientSecureIntegrationTest} for tests of ACL operations + * when the authorizer is enabled. + */ + @Test + override def testAclOperations(): Unit = { + client = AdminClient.create(createConfig()) + assertEquals(0, client.describeAcls(AclBindingFilter.ANY).all().get().size()) + val results = client.createAcls(List(ACL2, ACL3).asJava) + assertEquals(Set(ACL2, ACL3), results.results().keySet().asScala) + results.results().values().asScala.foreach(value => value.get) + val results2 = client.createAcls(List(ACL_UNKNOWN).asJava) + assertEquals(Set(ACL_UNKNOWN), results2.results().keySet().asScala) + assertFutureExceptionTypeEquals(results2.all(), classOf[InvalidRequestException]) + val results3 = client.deleteAcls(List(ACL1.toFilter, ACL2.toFilter, ACL3.toFilter).asJava) + assertEquals(Set(ACL1.toFilter, ACL2.toFilter, ACL3.toFilter), results3.results().keySet().asScala) + assertEquals(0, results3.results.get(ACL1.toFilter).get.acls.size()) + assertEquals(Set(ACL2), results3.results.get(ACL2.toFilter).get.acls.asScala.map(result => result.acl()).toSet) + assertEquals(Set(ACL3), results3.results.get(ACL3.toFilter).get.acls.asScala.map(result => result.acl()).toSet) + client.close() + } + + def waitForDescribeAcls(client: AdminClient, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = { + TestUtils.waitUntilTrue(() => { + val results = client.describeAcls(filter).all().get() + acls == results.asScala.toSet + }, "timed out waiting for ACLs") + } + + @Test + def testAclOperations2(): Unit = { + client = AdminClient.create(createConfig()) + val results = client.createAcls(List(ACL2, ACL2).asJava) + assertEquals(Set(ACL2, ACL2), results.results().keySet().asScala) + results.all().get() + waitForDescribeAcls(client, AclBindingFilter.ANY, Set(ACL2)) + + val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.CLUSTER, null), AccessControlEntryFilter.ANY) + val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2"), AccessControlEntryFilter.ANY) + + waitForDescribeAcls(client, filterA, Set()) + + val results2 = client.deleteAcls(List(filterA, filterB).asJava, new DeleteAclsOptions()) + assertEquals(Set(filterA, filterB), results2.results().keySet().asScala) + assertEquals(Set(), results2.results.get(filterA).get.acls.asScala.map(result => result.acl()).toSet) + assertEquals(Set(ACL2), results2.results.get(filterB).get.acls.asScala.map(result => result.acl()).toSet) + + waitForDescribeAcls(client, filterB, Set()) + + client.close() + } + + @Test + def testAttemptToCreateInvalidAcls(): Unit = { + client = AdminClient.create(createConfig()) + val clusterAcl = new AclBinding(new Resource(ResourceType.CLUSTER, "foobar"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) + val emptyResourceNameAcl = new AclBinding(new Resource(ResourceType.TOPIC, ""), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) + val results = client.createAcls(List(clusterAcl, emptyResourceNameAcl).asJava, new CreateAclsOptions()) + assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.results().keySet().asScala) + assertFutureExceptionTypeEquals(results.results().get(clusterAcl), classOf[InvalidRequestException]) + assertFutureExceptionTypeEquals(results.results().get(emptyResourceNameAcl), classOf[InvalidRequestException]) + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 1c496cd..5cb6f71 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -16,12 +16,14 @@ package kafka.server import java.net.Socket import java.nio.ByteBuffer -import java.util.{LinkedHashMap, Properties} +import java.util.{Collections, LinkedHashMap, Properties} import java.util.concurrent.{Executors, Future, TimeUnit} + import kafka.admin.AdminUtils import kafka.network.RequestChannel.Session import kafka.security.auth._ import kafka.utils.TestUtils +import org.apache.kafka.clients.admin.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType, ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor} import org.apache.kafka.common.network.{Authenticator, ListenerName, TransportLayer} @@ -29,10 +31,12 @@ import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.protocol.types.Struct import org.apache.kafka.common.record._ import org.apache.kafka.common.requests +import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.{DefaultPrincipalBuilder, KafkaPrincipal} import org.junit.Assert._ -import org.junit.{Before, After, Test} +import org.junit.{After, Before, Test} + import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -260,6 +264,19 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.TXN_OFFSET_COMMIT => new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, Map.empty.asJava) + case ApiKeys.DESCRIBE_ACLS => + new DescribeAclsRequest.Builder(AclBindingFilter.ANY) + + case ApiKeys.CREATE_ACLS => + new CreateAclsRequest.Builder(Collections.singletonList(new AclCreation(new AclBinding( + new AdminResource(AdminResourceType.TOPIC, "mytopic"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY))))) + + case ApiKeys.DELETE_ACLS => + new DeleteAclsRequest.Builder(Collections.singletonList(new AclBindingFilter( + new ResourceFilter(AdminResourceType.TOPIC, null), + new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))) + case key => throw new IllegalArgumentException("Unsupported API key " + apiKey) } @@ -346,6 +363,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs case ApiKeys.TXN_OFFSET_COMMIT => new TxnOffsetCommitResponse(response).throttleTimeMs + case ApiKeys.DESCRIBE_ACLS => new DescribeAclsResponse(response).throttleTimeMs + case ApiKeys.CREATE_ACLS => new CreateAclsResponse(response).throttleTimeMs + case ApiKeys.DELETE_ACLS => new DeleteAclsResponse(response).throttleTimeMs case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId") } }
