Repository: kafka Updated Branches: refs/heads/trunk 56623efd7 -> b661d3b8a
MINOR: Small refactor of request quotas handling in KafkaApis - Avoid unnecessary inner methods - Remove redundant parameter in `sendResponseExemptThrottle` - Go through `sendResponseExemptThrottle` for produce requests with acks=0 - Tighten how we handle cases where thereâs no response Author: Ismael Juma <[email protected]> Reviewers: Rajini Sivaram <[email protected]> Closes #3087 from ijuma/kafka-apis-improvements Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b661d3b8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b661d3b8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b661d3b8 Branch: refs/heads/trunk Commit: b661d3b8ab37fc63b44145ba29d129b07c1582b9 Parents: 56623ef Author: Ismael Juma <[email protected]> Authored: Fri May 19 00:14:08 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Fri May 19 00:14:08 2017 +0100 ---------------------------------------------------------------------- .../kafka/api/ControlledShutdownRequest.scala | 2 +- .../src/main/scala/kafka/api/FetchRequest.scala | 2 +- .../kafka/api/GroupCoordinatorRequest.scala | 4 +- .../scala/kafka/api/OffsetCommitRequest.scala | 2 +- .../scala/kafka/api/OffsetFetchRequest.scala | 2 +- .../main/scala/kafka/api/OffsetRequest.scala | 2 +- .../main/scala/kafka/api/ProducerRequest.scala | 4 +- .../scala/kafka/api/TopicMetadataRequest.scala | 2 +- .../scala/kafka/network/RequestChannel.scala | 40 +- .../main/scala/kafka/network/SocketServer.scala | 12 +- .../src/main/scala/kafka/server/KafkaApis.scala | 390 +++++++++---------- .../unit/kafka/network/SocketServerTest.scala | 16 +- 12 files changed, 231 insertions(+), 247 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index 46ae1e7..a0ad6cf 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -69,7 +69,7 @@ case class ControlledShutdownRequest(versionId: Short, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val errorResponse = ControlledShutdownResponse(correlationId, Errors.forException(e), Set.empty[TopicAndPartition]) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) + requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean = false): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/FetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 39da605..60284f7 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -208,7 +208,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, } val errorResponse = new JFetchResponse(responseData, 0) // Magic value does not matter here because the message set is empty - requestChannel.sendResponse(new RequestChannel.Response(request, errorResponse)) + requestChannel.sendResponse(RequestChannel.Response(request, errorResponse)) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala index d99474d..b3616fa 100644 --- a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala +++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala @@ -65,7 +65,7 @@ case class GroupCoordinatorRequest(group: String, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { // return ConsumerCoordinatorNotAvailable for all uncaught errors val errorResponse = GroupCoordinatorResponse(None, Errors.COORDINATOR_NOT_AVAILABLE, correlationId) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) + requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } def describe(details: Boolean) = { @@ -77,4 +77,4 @@ case class GroupCoordinatorRequest(group: String, consumerMetadataRequest.append("; Group: " + group) consumerMetadataRequest.toString() } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/OffsetCommitRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index b9693f6..e598500 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -165,7 +165,7 @@ case class OffsetCommitRequest(groupId: String, val commitStatus = requestInfo.mapValues(_ => error) val commitResponse = OffsetCommitResponse(commitStatus, correlationId) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, commitResponse))) + requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, commitResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/OffsetFetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index 33d3795..310860f 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -104,7 +104,7 @@ case class OffsetFetchRequest(groupId: String, } val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId, error=thrownError) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) + requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/OffsetRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 879d60d..876022c 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -118,7 +118,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ (topicAndPartition, PartitionOffsetsResponse(Errors.forException(e), Nil)) } val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) + requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/ProducerRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index bd48388..38fef5b 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -130,14 +130,14 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { if (request.body[org.apache.kafka.common.requests.ProduceRequest].acks == 0) { - requestChannel.closeConnection(request.processor, request) + requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.CloseConnectionAction)) } else { val producerResponseStatus = data.map { case (topicAndPartition, _) => (topicAndPartition, ProducerResponseStatus(Errors.forException(e), -1l, Message.NoTimestamp)) } val errorResponse = ProducerResponse(correlationId, producerResponseStatus) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) + requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 403152a..6bbcab5 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -64,7 +64,7 @@ case class TopicMetadataRequest(versionId: Short, topic => TopicMetadata(topic, Nil, Errors.forException(e)) } val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) + requestChannel.sendResponse(Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 2d41869..bb19346 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -37,7 +37,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Time import org.apache.log4j.Logger -import scala.reflect.{ClassTag, classTag} +import scala.reflect.ClassTag object RequestChannel extends Logging { val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost), @@ -195,15 +195,27 @@ object RequestChannel extends Logging { } } - case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) { + object Response { + + def apply(request: Request, responseSend: Send): Response = { + require(request != null, "request should be non null") + require(responseSend != null, "responseSend should be non null") + new Response(request, Some(responseSend), SendAction) + } + + def apply(request: Request, response: AbstractResponse): Response = { + require(request != null, "request should be non null") + require(response != null, "response should be non null") + apply(request, response.toSend(request.connectionId, request.header)) + } + + } + + case class Response(request: Request, responseSend: Option[Send], responseAction: ResponseAction) { request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds - def this(request: Request, responseSend: Send) = - this(request.processor, request, responseSend, if (responseSend == null) NoOpAction else SendAction) - - def this(request: Request, response: AbstractResponse) = - this(request, response.toSend(request.connectionId, request.header)) + def processor: Int = request.processor } trait ResponseAction @@ -251,20 +263,6 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe onResponse(response.processor) } - /** No operation to take for the request, need to read more over the network */ - def noOperation(processor: Int, request: RequestChannel.Request) { - responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction)) - for(onResponse <- responseListeners) - onResponse(processor) - } - - /** Close the connection for the request */ - def closeConnection(processor: Int, request: RequestChannel.Request) { - responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction)) - for(onResponse <- responseListeners) - onResponse(processor) - } - /** Get the next request or block until specified time has elapsed */ def receiveRequest(timeout: Long): RequestChannel.Request = requestQueue.poll(timeout, TimeUnit.MILLISECONDS) http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 48d0233..414557e 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -33,7 +33,7 @@ import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.common.errors.InvalidRequestException import org.apache.kafka.common.metrics._ -import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Selector => KSelector} +import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.types.SchemaException @@ -466,7 +466,9 @@ private[kafka] class Processor(val id: Int, if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null) selector.unmute(channelId) case RequestChannel.SendAction => - sendResponse(curr) + val responseSend = curr.responseSend.getOrElse( + throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr")) + sendResponse(curr, responseSend) case RequestChannel.CloseConnectionAction => updateRequestMetrics(curr.request) trace("Closing socket connection actively according to the response code.") @@ -479,16 +481,16 @@ private[kafka] class Processor(val id: Int, } /* `protected` for test usage */ - protected[network] def sendResponse(response: RequestChannel.Response) { + protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) { trace(s"Socket server received response to send, registering for write and sending data: $response") - val channel = selector.channel(response.responseSend.destination) + val channel = selector.channel(responseSend.destination) // `channel` can be null if the selector closed the connection because it was idle for too long if (channel == null) { warn(s"Attempting to send response via channel for which there is no open connection, connection id $id") response.request.updateRequestMetrics(0L) } else { - selector.send(response.responseSend) + selector.send(responseSend) inflightResponses += (response.request.connectionId -> response) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index db15f72..1346fb3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -34,7 +34,6 @@ import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult} import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} import kafka.log.{Log, LogManager, TimestampOffset} import kafka.network.{RequestChannel, RequestOrResponseSend} -import kafka.network.RequestChannel.{Response, Session} import kafka.security.auth._ import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors._ @@ -167,11 +166,11 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange) val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava) - sendResponseExemptThrottle(request, new Response(request, leaderAndIsrResponse)) + sendResponseExemptThrottle(RequestChannel.Response(request, leaderAndIsrResponse)) } else { val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap - def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, _ => + new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)) } } catch { case e: FatalExitError => throw e @@ -201,11 +200,11 @@ class KafkaApis(val requestChannel: RequestChannel, } } val response = new StopReplicaResponse(error, result.asJava) - sendResponseExemptThrottle(request, new Response(request, response)) + sendResponseExemptThrottle(RequestChannel.Response(request, response)) } else { val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap - def createResponse(throttleTimeMs: Int): AbstractResponse = new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, _ => + new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)) } replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() @@ -225,10 +224,9 @@ class KafkaApis(val requestChannel: RequestChannel, adminManager.tryCompleteDelayedTopicOperations(topic) } } - sendResponseExemptThrottle(request, new Response(request, new UpdateMetadataResponse(Errors.NONE))) + sendResponseExemptThrottle(RequestChannel.Response(request, new UpdateMetadataResponse(Errors.NONE))) } else { - def createResponse(throttleTimeMs: Int): AbstractResponse = new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, _ => new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)) } } @@ -245,7 +243,8 @@ class KafkaApis(val requestChannel: RequestChannel, case Success(partitionsRemaining) => val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, Errors.NONE, partitionsRemaining) - sendResponseExemptThrottle(request, new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse))) + sendResponseExemptThrottle(RequestChannel.Response(request, + new RequestOrResponseSend(request.connectionId, controlledShutdownResponse))) case Failure(throwable) => sendResponseExemptThrottle(request, () => controlledShutdownRequest.handleError(throwable, requestChannel, request)) } @@ -266,8 +265,7 @@ class KafkaApis(val requestChannel: RequestChannel, val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition => (topicPartition, error) }.toMap - def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, results.asJava) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(requestThrottleMs, results.asJava)) } else { val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition { case (topicPartition, _) => @@ -296,8 +294,8 @@ class KafkaApis(val requestChannel: RequestChannel, s"on partition $topicPartition failed due to ${error.exceptionName}") } } - def createResponse(throttleTimeMs: Int): AbstractResponse = new OffsetCommitResponse(throttleTimeMs, combinedCommitStatus.asJava) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) } if (authorizedTopics.isEmpty) @@ -364,7 +362,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - private def authorize(session: Session, operation: Operation, resource: Resource): Boolean = + private def authorize(session: RequestChannel.Session, operation: Operation, resource: Resource): Boolean = authorizer.forall(_.authorize(session, operation, resource)) /** @@ -375,9 +373,9 @@ class KafkaApis(val requestChannel: RequestChannel, val numBytesAppended = request.header.toStruct.sizeOf + request.bodyAndSize.size if (produceRequest.isTransactional && !authorize(request.session, Write, new Resource(ProducerTransactionalId, produceRequest.transactionalId()))) - sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception())) + sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) else if (produceRequest.isIdempotent && !authorize(request.session, Write, Resource.ProducerIdResource)) - sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED.exception())) + sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED.exception)) else { val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) => @@ -413,25 +411,23 @@ class KafkaApis(val requestChannel: RequestChannel, // no operation needed if producer request.required.acks = 0; however, if there is any error in handling // the request, since no response is expected by the producer, the server will close socket server so that // the producer client will know that some error has happened and will refresh its metadata - if (errorInResponse) { - val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) => - topicPartition -> status.error.exceptionName - }.mkString(", ") - info( - s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " + - s"from client id ${request.header.clientId} with ack=0\n" + - s"Topic and partition to exceptions: $exceptionsSummary" - ) - requestChannel.closeConnection(request.processor, request) - } else { - requestChannel.noOperation(request.processor, request) - } + val action = + if (errorInResponse) { + val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) => + topicPartition -> status.error.exceptionName + }.mkString(", ") + info( + s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " + + s"from client id ${request.header.clientId} with ack=0\n" + + s"Topic and partition to exceptions: $exceptionsSummary" + ) + RequestChannel.CloseConnectionAction + } else RequestChannel.NoOpAction + sendResponseExemptThrottle(new RequestChannel.Response(request, None, action)) } else { - def createResponseCallback(requestThrottleTimeMs: Int): AbstractResponse = { - new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleTimeMs) - } - - sendResponseMaybeThrottle(request, createResponseCallback) + sendResponseMaybeThrottle(request, requestThrottleMs => + new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs) + ) } } @@ -552,16 +548,16 @@ class KafkaApis(val requestChannel: RequestChannel, brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes) } - val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header) - new RequestChannel.Response(request, responseSend) - } - def sendResponseCallback(requestThrottleTimeMs: Int) { - requestChannel.sendResponse(createResponse(requestThrottleTimeMs)) + val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs, + request.connectionId, request.header) + RequestChannel.Response(request, responseSend) } + if (fetchRequest.isFromFollower) - sendResponseExemptThrottle(request, createResponse(0)) + sendResponseExemptThrottle(createResponse(0)) else - sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback) + sendResponseMaybeThrottle(request, request.header.clientId, requestThrottleMs => + requestChannel.sendResponse(createResponse(requestThrottleMs))) } // When this callback is triggered, the remote API call has completed. @@ -628,8 +624,7 @@ class KafkaApis(val requestChannel: RequestChannel, else handleListOffsetRequestV1AndAbove(request) - def createResponse(throttleTimeMs: Int): AbstractResponse = new ListOffsetResponse(throttleTimeMs, mergedResponseMap.asJava) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) } private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { @@ -968,15 +963,14 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","), brokers.mkString(","), request.header.correlationId, request.header.clientId)) - def createResponse(throttleTimeMs: Int): AbstractResponse = new MetadataResponse( - throttleTimeMs, - brokers.map(_.getNode(request.listenerName)).asJava, - clusterId, - metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID), - completeTopicMetadata.asJava - ) - - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new MetadataResponse( + requestThrottleMs, + brokers.map(_.getNode(request.listenerName)).asJava, + clusterId, + metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID), + completeTopicMetadata.asJava + )) } /** @@ -989,11 +983,11 @@ class KafkaApis(val requestChannel: RequestChannel, def authorizeTopicDescribe(partition: TopicPartition) = authorize(request.session, Describe, new Resource(Topic, partition.topic)) - def createResponse(throttleTimeMs: Int): AbstractResponse = { + def createResponse(requestThrottleMs: Int): AbstractResponse = { val offsetFetchResponse = // reject the request if not authorized to the group if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) - offsetFetchRequest.getErrorResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED) + offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED) else { if (header.apiVersion == 0) { val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala @@ -1023,17 +1017,17 @@ class KafkaApis(val requestChannel: RequestChannel, }.toMap val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap - new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava) + new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava) } else { // versions 1 and above read offsets from Kafka if (offsetFetchRequest.isAllPartitions) { val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId) if (error != Errors.NONE) - offsetFetchRequest.getErrorResponse(throttleTimeMs, error) + offsetFetchRequest.getErrorResponse(requestThrottleMs, error) else { // clients are not allowed to see offsets for topics that are not authorized for Describe val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) } - new OffsetFetchResponse(throttleTimeMs, Errors.NONE, authorizedPartitionData.asJava) + new OffsetFetchResponse(requestThrottleMs, Errors.NONE, authorizedPartitionData.asJava) } } else { val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala @@ -1041,10 +1035,10 @@ class KafkaApis(val requestChannel: RequestChannel, val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId, Some(authorizedPartitions)) if (error != Errors.NONE) - offsetFetchRequest.getErrorResponse(throttleTimeMs, error) + offsetFetchRequest.getErrorResponse(requestThrottleMs, error) else { val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap - new OffsetFetchResponse(throttleTimeMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava) + new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava) } } } @@ -1060,9 +1054,8 @@ class KafkaApis(val requestChannel: RequestChannel, if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP && !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) { - - def createResponse(throttleTimeMs: Int): AbstractResponse = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new FindCoordinatorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)) } else { // TODO: Authorize by transactional id if coordinator type is TRANSACTION @@ -1082,19 +1075,19 @@ class KafkaApis(val requestChannel: RequestChannel, throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") } - def createResponse(throttleTimeMs: Int): AbstractResponse = { + def createResponse(requestThrottleMs: Int): AbstractResponse = { val responseBody = if (topicMetadata.error != Errors.NONE) { - new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) + new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } else { val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala .find(_.partition == partition) - .map(_.leader()) + .map(_.leader) coordinatorEndpoint match { case Some(endpoint) if !endpoint.isEmpty => - new FindCoordinatorResponse(throttleTimeMs, Errors.NONE, endpoint) + new FindCoordinatorResponse(requestThrottleMs, Errors.NONE, endpoint) case _ => - new FindCoordinatorResponse(throttleTimeMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) + new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } } trace("Sending FindCoordinator response %s for correlation id %d to client %s." @@ -1108,7 +1101,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDescribeGroupRequest(request: RequestChannel.Request) { val describeRequest = request.body[DescribeGroupsRequest] - val groups = describeRequest.groupIds().asScala.map { groupId => + val groups = describeRequest.groupIds.asScala.map { groupId => if (!authorize(request.session, Describe, new Resource(Group, groupId))) { groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED) } else { @@ -1123,19 +1116,18 @@ class KafkaApis(val requestChannel: RequestChannel, } }.toMap - def createResponse(throttleTimeMs: Int): AbstractResponse = new DescribeGroupsResponse(throttleTimeMs, groups.asJava) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => new DescribeGroupsResponse(requestThrottleMs, groups.asJava)) } def handleListGroupsRequest(request: RequestChannel.Request) { if (!authorize(request.session, Describe, Resource.ClusterResource)) { - def createResponse(throttleTimeMs: Int): AbstractResponse = ListGroupsResponse.fromError(throttleTimeMs, Errors.CLUSTER_AUTHORIZATION_FAILED) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + ListGroupsResponse.fromError(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED)) } else { val (error, groups) = groupCoordinator.handleListGroups() val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } - def createResponse(throttleTimeMs: Int): AbstractResponse = new ListGroupsResponse(throttleTimeMs, error, allGroups.asJava) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new ListGroupsResponse(requestThrottleMs, error, allGroups.asJava)) } } @@ -1145,8 +1137,8 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a join-group response def sendResponseCallback(joinResult: JoinGroupResult) { val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) } - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val responseBody = new JoinGroupResponse(throttleTimeMs, joinResult.error, joinResult.generationId, + def createResponse(requestThrottleMs: Int): AbstractResponse = { + val responseBody = new JoinGroupResponse(requestThrottleMs, joinResult.error, joinResult.generationId, joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava) trace("Sending join group response %s for correlation id %d to client %s." @@ -1157,15 +1149,16 @@ class KafkaApis(val requestChannel: RequestChannel, } if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) { - def createResponse(throttleTimeMs: Int): AbstractResponse = new JoinGroupResponse( - throttleTimeMs, - Errors.GROUP_AUTHORIZATION_FAILED, - JoinGroupResponse.UNKNOWN_GENERATION_ID, - JoinGroupResponse.UNKNOWN_PROTOCOL, - JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId - JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId - Collections.emptyMap()) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new JoinGroupResponse( + requestThrottleMs, + Errors.GROUP_AUTHORIZATION_FAILED, + JoinGroupResponse.UNKNOWN_GENERATION_ID, + JoinGroupResponse.UNKNOWN_PROTOCOL, + JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId + JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId + Collections.emptyMap()) + ) } else { // let the coordinator to handle join-group val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol => @@ -1187,8 +1180,8 @@ class KafkaApis(val requestChannel: RequestChannel, val syncGroupRequest = request.body[SyncGroupRequest] def sendResponseCallback(memberState: Array[Byte], error: Errors) { - def createResponse(throttleTimeMs: Int): AbstractResponse = new SyncGroupResponse(throttleTimeMs, error, ByteBuffer.wrap(memberState)) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new SyncGroupResponse(requestThrottleMs, error, ByteBuffer.wrap(memberState))) } if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) { @@ -1209,8 +1202,8 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a heartbeat response def sendResponseCallback(error: Errors) { - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val response = new HeartbeatResponse(throttleTimeMs, error) + def createResponse(requestThrottleMs: Int): AbstractResponse = { + val response = new HeartbeatResponse(requestThrottleMs, error) trace("Sending heartbeat response %s for correlation id %d to client %s." .format(response, request.header.correlationId, request.header.clientId)) response @@ -1219,10 +1212,9 @@ class KafkaApis(val requestChannel: RequestChannel, } if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) { - def createResponse(throttleTimeMs: Int): AbstractResponse = new HeartbeatResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED) - sendResponseMaybeThrottle(request, createResponse) - } - else { + sendResponseMaybeThrottle(request, requestThrottleMs => + new HeartbeatResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)) + } else { // let the coordinator to handle heartbeat groupCoordinator.handleHeartbeat( heartbeatRequest.groupId(), @@ -1237,8 +1229,8 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a leave-group response def sendResponseCallback(error: Errors) { - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val response = new LeaveGroupResponse(throttleTimeMs, error) + def createResponse(requestThrottleMs: Int): AbstractResponse = { + val response = new LeaveGroupResponse(requestThrottleMs, error) trace("Sending leave group response %s for correlation id %d to client %s." .format(response, request.header.correlationId, request.header.clientId)) response @@ -1247,8 +1239,8 @@ class KafkaApis(val requestChannel: RequestChannel, } if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) { - def createResponse(throttleTimeMs: Int): AbstractResponse = new LeaveGroupResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new LeaveGroupResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)) } else { // let the coordinator to handle leave-group groupCoordinator.handleLeaveGroup( @@ -1259,8 +1251,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleSaslHandshakeRequest(request: RequestChannel.Request) { - def createResponse(throttleTimeMs: Int): AbstractResponse = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)) } def handleApiVersionsRequest(request: RequestChannel.Request) { @@ -1270,12 +1261,12 @@ class KafkaApis(val requestChannel: RequestChannel, // If this is considered to leak information about the broker version a workaround is to use SSL // with client authentication which is performed at an earlier stage of the connection where the // ApiVersionRequest is not available. - def sendResponseCallback(throttleTimeMs: Int) { + def sendResponseCallback(requestThrottleMs: Int) { val responseSend = if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion)) - ApiVersionsResponse.apiVersionsResponse(request.header.apiVersion, throttleTimeMs).toSend(request.connectionId, request.header) + ApiVersionsResponse.apiVersionsResponse(request.header.apiVersion, requestThrottleMs).toSend(request.connectionId, request.header) else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header) - requestChannel.sendResponse(new RequestChannel.Response(request, responseSend)) + requestChannel.sendResponse(RequestChannel.Response(request, responseSend)) } sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback) } @@ -1284,8 +1275,8 @@ class KafkaApis(val requestChannel: RequestChannel, val createTopicsRequest = request.body[CreateTopicsRequest] def sendResponseCallback(results: Map[String, ApiError]): Unit = { - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val responseBody = new CreateTopicsResponse(throttleTimeMs, results.asJava) + def createResponse(requestThrottleMs: Int): AbstractResponse = { + val responseBody = new CreateTopicsResponse(requestThrottleMs, results.asJava) trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.") responseBody } @@ -1345,10 +1336,10 @@ class KafkaApis(val requestChannel: RequestChannel, } def sendResponseCallback(results: Map[String, Errors]): Unit = { - def createResponse(throttleTimeMs: Int): AbstractResponse = { + def createResponse(requestThrottleMs: Int): AbstractResponse = { val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++ unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results - val responseBody = new DeleteTopicsResponse(throttleTimeMs, completeResults.asJava) + val responseBody = new DeleteTopicsResponse(requestThrottleMs, completeResults.asJava) trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.") responseBody } @@ -1404,8 +1395,8 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def createResponse(throttleTimeMs: Int): AbstractResponse = new DeleteRecordsResponse(throttleTimeMs, mergedResponseStatus.asJava) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new DeleteRecordsResponse(requestThrottleMs, mergedResponseStatus.asJava)) } if (authorizedForDeleteTopics.isEmpty) @@ -1425,12 +1416,13 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authorize(request.session, Write, Resource.ProducerIdResource)) { - sendResponseMaybeThrottle(request, (throttleTime: Int) => new InitProducerIdResponse(throttleTime, Errors.PRODUCER_ID_AUTHORIZATION_FAILED)) + sendResponseMaybeThrottle(request, requestThrottleMs => + new InitProducerIdResponse(requestThrottleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED)) } else if (transactionalId == null || authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) { // Send response callback def sendResponseCallback(result: InitProducerIdResult): Unit = { - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val responseBody = new InitProducerIdResponse(throttleTimeMs, result.error, result.producerId, result.producerEpoch) + def createResponse(requestThrottleMs: Int): AbstractResponse = { + val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch) trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.") responseBody } @@ -1438,7 +1430,8 @@ class KafkaApis(val requestChannel: RequestChannel, } txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback) }else - sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new InitProducerIdResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) + sendResponseMaybeThrottle(request, requestThrottleMs => + new InitProducerIdResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) } def handleEndTxnRequest(request: RequestChannel.Request): Unit = { @@ -1447,8 +1440,8 @@ class KafkaApis(val requestChannel: RequestChannel, if(authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) { def sendResponseCallback(error: Errors) { - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val responseBody = new EndTxnResponse(throttleTimeMs, error) + def createResponse(requestThrottleMs: Int): AbstractResponse = { + val responseBody = new EndTxnResponse(requestThrottleMs, error) trace(s"Completed ${endTxnRequest.transactionalId}'s EndTxnRequest with command: ${endTxnRequest.command}, errors: $error from client ${request.header.clientId}.") responseBody } @@ -1461,7 +1454,8 @@ class KafkaApis(val requestChannel: RequestChannel, endTxnRequest.command, sendResponseCallback) } else - sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new EndTxnResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) + sendResponseMaybeThrottle(request, requestThrottleMs => + new EndTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) } def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = { @@ -1472,7 +1466,7 @@ class KafkaApis(val requestChannel: RequestChannel, val numAppends = new AtomicInteger(markers.size) if (numAppends.get == 0) { - sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors))) + sendResponseExemptThrottle(RequestChannel.Response(request, new WriteTxnMarkersResponse(errors))) return } @@ -1497,7 +1491,7 @@ class KafkaApis(val requestChannel: RequestChannel, } if (numAppends.decrementAndGet() == 0) - sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors))) + sendResponseExemptThrottle(RequestChannel.Response(request, new WriteTxnMarkersResponse(errors))) } // TODO: The current append API makes doing separate writes per producerId a little easier, but it would @@ -1531,7 +1525,8 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionsToAdd = addPartitionsToTxnRequest.partitions if(!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) - sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => addPartitionsToTxnRequest.getErrorResponse(1, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception())) + sendResponseMaybeThrottle(request, requestThrottleMs => + addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception())) else { val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())} @@ -1554,12 +1549,13 @@ class KafkaApis(val requestChannel: RequestChannel, nonExistingOrUnauthorizedForDescribeTopics.map { tp => (tp, Errors.UNKNOWN_TOPIC_OR_PARTITION) }.toMap ++ internalTopics.map { tp => (tp, Errors.TOPIC_AUTHORIZATION_FAILED) } - sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddPartitionsToTxnResponse(throttleTimeMs, partitionErrors.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => + new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava)) } else { // Send response callback def sendResponseCallback(error: Errors): Unit = { - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(throttleTimeMs, + def createResponse(requestThrottleMs: Int): AbstractResponse = { + val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(requestThrottleMs, partitionsToAdd.asScala.map{tp => (tp, error)}.toMap.asJava) trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}") responseBody @@ -1586,14 +1582,16 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) if (!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) - sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddOffsetsToTxnResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) + sendResponseMaybeThrottle(request, requestThrottleMs => + new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) else if (!authorize(request.session, Read, new Resource(Group, groupId))) - sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddOffsetsToTxnResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)) + sendResponseMaybeThrottle(request, requestThrottleMs => + new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)) else { // Send response callback def sendResponseCallback(error: Errors): Unit = { - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(throttleTimeMs, error) + def createResponse(requestThrottleMs: Int): AbstractResponse = { + val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(requestThrottleMs, error) trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}") responseBody } @@ -1618,8 +1616,8 @@ class KafkaApis(val requestChannel: RequestChannel, val results = txnOffsetCommitRequest.offsets.keySet.asScala.map { topicPartition => (topicPartition, error) }.toMap - def createResponse(throttleTimeMs: Int): AbstractResponse = new TxnOffsetCommitResponse(throttleTimeMs, results.asJava) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new TxnOffsetCommitResponse(requestThrottleMs, results.asJava)) } else { val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = txnOffsetCommitRequest.offsets.asScala.toMap.partition { case (topicPartition, _) => @@ -1649,8 +1647,8 @@ class KafkaApis(val requestChannel: RequestChannel, s"on partition $topicPartition failed due to ${error.exceptionName}") } } - def createResponse(throttleTimeMs: Int): AbstractResponse = new TxnOffsetCommitResponse(throttleTimeMs, combinedCommitStatus.asJava) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new TxnOffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) } if (authorizedTopics.isEmpty) @@ -1685,29 +1683,24 @@ class KafkaApis(val requestChannel: RequestChannel, val describeAclsRequest = request.body[DescribeAclsRequest] authorizer match { case None => - def createResponse(throttleTimeMs: Int): AbstractResponse = - new DescribeAclsResponse(throttleTimeMs, new SecurityDisabledException( - "No Authorizer is configured on the broker."), Collections.emptySet[AclBinding]); - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new DescribeAclsResponse(requestThrottleMs, + new SecurityDisabledException("No Authorizer is configured on the broker."), + Collections.emptySet())) case Some(auth) => val filter = describeAclsRequest.filter() - var returnedAcls = new util.ArrayList[AclBinding] - val aclMap : Map[Resource, Set[Acl]] = auth.getAcls() - aclMap.foreach { - case (resource, acls) => { - acls.foreach { - case (acl) => { - val fixture = new AclBinding(new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name), - new AccessControlEntry(acl.principal.toString(), acl.host.toString(), acl.operation.toJava, acl.permissionType.toJava)) - if (filter.matches(fixture)) - returnedAcls.add(fixture) - } - } + val returnedAcls = new util.ArrayList[AclBinding] + val aclMap = auth.getAcls() + aclMap.foreach { case (resource, acls) => + acls.foreach { acl => + val fixture = new AclBinding(new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name), + new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava)) + if (filter.matches(fixture)) + returnedAcls.add(fixture) } } - def createResponse(throttleTimeMs: Int): AbstractResponse = - new DescribeAclsResponse(throttleTimeMs, null, returnedAcls) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new DescribeAclsResponse(requestThrottleMs, null, returnedAcls)) } } @@ -1780,14 +1773,13 @@ class KafkaApis(val requestChannel: RequestChannel, val createAclsRequest = request.body[CreateAclsRequest] authorizer match { case None => - def createResponse(throttleTimeMs: Int): AbstractResponse = - createAclsRequest.getErrorResponse(throttleTimeMs, - new SecurityDisabledException("No Authorizer is configured on the broker.")) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + createAclsRequest.getErrorResponse(requestThrottleMs, + new SecurityDisabledException("No Authorizer is configured on the broker."))) case Some(auth) => val errors = mutable.HashMap[Int, Throwable]() - var creations = ListBuffer[(Resource, Acl)]() - for (i <- 0 to createAclsRequest.aclCreations().size() - 1) { + val creations = ListBuffer[(Resource, Acl)]() + for (i <- 0 until createAclsRequest.aclCreations.size) { val result = toScala(createAclsRequest.aclCreations.get(i).acl.toFilter) result match { case Failure(throwable) => errors.put(i, throwable) @@ -1796,7 +1788,7 @@ class KafkaApis(val requestChannel: RequestChannel, !resource.name.equals(Resource.ClusterResourceName)) throw new InvalidRequestException("The only valid name for the CLUSTER resource is " + Resource.ClusterResourceName) - if (resource.name.isEmpty()) + if (resource.name.isEmpty) throw new InvalidRequestException("Invalid empty resource name") auth.addAcls(immutable.Set(acl), resource) } catch { @@ -1804,16 +1796,15 @@ class KafkaApis(val requestChannel: RequestChannel, } } } - var aclCreationResults = new java.util.ArrayList[AclCreationResponse] + val aclCreationResults = new java.util.ArrayList[AclCreationResponse] for (i <- 0 to createAclsRequest.aclCreations().size() - 1) { errors.get(i) match { case Some(throwable) => aclCreationResults.add(new AclCreationResponse(throwable)) case None => aclCreationResults.add(new AclCreationResponse(null)) } } - def createResponse(throttleTimeMs: Int): AbstractResponse = - new CreateAclsResponse(throttleTimeMs, aclCreationResults) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + new CreateAclsResponse(requestThrottleMs, aclCreationResults)) } } @@ -1822,33 +1813,28 @@ class KafkaApis(val requestChannel: RequestChannel, val deleteAclsRequest = request.body[DeleteAclsRequest] authorizer match { case None => - def createResponse(throttleTimeMs: Int): AbstractResponse = - deleteAclsRequest.getErrorResponse(throttleTimeMs, - new SecurityDisabledException("No Authorizer is configured on the broker.")) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => + deleteAclsRequest.getErrorResponse(requestThrottleMs, + new SecurityDisabledException("No Authorizer is configured on the broker."))) case Some(auth) => val filterResponseMap = mutable.HashMap[Int, AclFilterResponse]() - var toDelete = mutable.HashMap[Int, ListBuffer[(Resource, Acl)]]() + val toDelete = mutable.HashMap[Int, ListBuffer[(Resource, Acl)]]() for (i <- 0 to deleteAclsRequest.filters().size - 1) { toDelete.put(i, new ListBuffer[(Resource, Acl)]()) } if (deleteAclsRequest.filters().asScala.exists { f => !f.matchesAtMostOne() }) { // Delete based on filters that may match more than one ACL. val aclMap : Map[Resource, Set[Acl]] = auth.getAcls() - aclMap.foreach { - case (resource, acls) => { - acls.foreach { - case (acl) => { - val binding = new AclBinding(new AdminResource(AdminResourceType. - fromString(resource.resourceType.toString), resource.name), - new AccessControlEntry(acl.principal.toString(), acl.host.toString(), - acl.operation.toJava, acl.permissionType.toJava)) - for (i <- 0 to deleteAclsRequest.filters().size - 1) { - val filter = deleteAclsRequest.filters().get(i) - if (filter.matches(binding)) { - toDelete.get(i).get += ((resource, acl)) - } - } + aclMap.foreach { case (resource, acls) => + acls.foreach { acl => + val binding = new AclBinding(new AdminResource(AdminResourceType. + fromString(resource.resourceType.toString), resource.name), + new AccessControlEntry(acl.principal.toString(), acl.host.toString(), + acl.operation.toJava, acl.permissionType.toJava)) + for (i <- 0 to deleteAclsRequest.filters().size - 1) { + val filter = deleteAclsRequest.filters().get(i) + if (filter.matches(binding)) { + toDelete.get(i).get += ((resource, acl)) } } } @@ -1885,9 +1871,7 @@ class KafkaApis(val requestChannel: RequestChannel, filterResponses.add(filterResponseMap.getOrElse(i, new AclFilterResponse(null, new util.ArrayList[AclDeletionResult]()))) } - def createResponse(throttleTimeMs: Int): AbstractResponse = - new DeleteAclsResponse(throttleTimeMs, filterResponses) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, requestThrottleMs => new DeleteAclsResponse(requestThrottleMs, filterResponses)) } } @@ -1899,13 +1883,13 @@ class KafkaApis(val requestChannel: RequestChannel, val responseBody = new OffsetsForLeaderEpochResponse( replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava ) - sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody)) + sendResponseExemptThrottle(RequestChannel.Response(request, responseBody)) } private def handleError(request: RequestChannel.Request, e: Throwable) { val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !ApiKeys.forId(request.requestId).clusterAction if (request.requestObj != null) { - def sendResponseCallback(throttleTimeMs: Int) { + def sendResponseCallback(requestThrottleMs: Int) { request.requestObj.handleError(e, requestChannel, request) error("Error when handling request %s".format(request.requestObj), e) } @@ -1920,20 +1904,21 @@ class KafkaApis(val requestChannel: RequestChannel, } else sendResponseExemptThrottle(request, () => sendResponseCallback(0)) } else { - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val response = request.body[AbstractRequest].getErrorResponse(throttleTimeMs, e) - + def createResponse(requestThrottleMs: Int): RequestChannel.Response = { + val response = request.body[AbstractRequest].getErrorResponse(requestThrottleMs, e) /* If request doesn't have a default error response, we just close the connection. For example, when produce request has acks set to 0 */ if (response == null) - requestChannel.closeConnection(request.processor, request) - response + new RequestChannel.Response(request, None, RequestChannel.CloseConnectionAction) + else RequestChannel.Response(request, response) } error("Error when handling request %s".format(request.body[AbstractRequest]), e) if (mayThrottle) - sendResponseMaybeThrottle(request, createResponse) + sendResponseMaybeThrottle(request, request.header.clientId, { requestThrottleMs => + requestChannel.sendResponse(createResponse(requestThrottleMs)) + }) else - sendResponseExemptThrottle(request, new RequestChannel.Response(request, createResponse(0))) + sendResponseExemptThrottle(createResponse(0)) } } @@ -1954,7 +1939,8 @@ class KafkaApis(val requestChannel: RequestChannel, val unauthorizedResult = unauthorizedResources.keys.map { resource => resource -> configsAuthorizationApiError(request.session, resource) } - sendResponseMaybeThrottle(request, new AlterConfigsResponse(_, (authorizedResult ++ unauthorizedResult).asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => + new AlterConfigsResponse(requestThrottleMs, (authorizedResult ++ unauthorizedResult).asJava)) } private def configsAuthorizationApiError(session: RequestChannel.Session, resource: RResource): ApiError = { @@ -1992,7 +1978,8 @@ class KafkaApis(val requestChannel: RequestChannel, resource -> new DescribeConfigsResponse.Config(error, Collections.emptyList[DescribeConfigsResponse.ConfigEntry]) } - sendResponseMaybeThrottle(request, new DescribeConfigsResponse(_, (authorizedConfigs ++ unauthorizedConfigs).asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => + new DescribeConfigsResponse(requestThrottleMs, (authorizedConfigs ++ unauthorizedConfigs).asJava)) } def authorizeClusterAction(request: RequestChannel.Request): Unit = { @@ -2001,12 +1988,9 @@ class KafkaApis(val requestChannel: RequestChannel, } private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse) { - def sendResponseCallback(throttleTimeMs: Int) { - val response = createResponse(throttleTimeMs) - if (response != null) - sendResponse(request, response) - } - sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback) + sendResponseMaybeThrottle(request, request.header.clientId, { requestThrottleMs => + sendResponse(request, createResponse(requestThrottleMs)) + }) } private def sendResponseMaybeThrottle(request: RequestChannel.Request, clientId: String, sendResponseCallback: Int => Unit) { @@ -2027,8 +2011,8 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseCallback) } - private def sendResponseExemptThrottle(request: RequestChannel.Request, response: Response) { - sendResponseExemptThrottle(request, () => requestChannel.sendResponse(response)) + private def sendResponseExemptThrottle(response: RequestChannel.Response) { + sendResponseExemptThrottle(response.request, () => requestChannel.sendResponse(response)) } private def sendResponseExemptThrottle(request: RequestChannel.Request, sendResponseCallback: () => Unit) { @@ -2042,7 +2026,7 @@ class KafkaApis(val requestChannel: RequestChannel, } private def sendResponse(request: RequestChannel.Request, response: AbstractResponse) { - requestChannel.sendResponse(new Response(request, response)) + requestChannel.sendResponse(RequestChannel.Response(request, response)) } private def nanosToPercentage(nanos: Long): Double = nanos * ClientQuotaManagerConfig.NanosToPercentagePerSecond http://git-wip-us.apache.org/repos/asf/kafka/blob/b661d3b8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 7678550..acf96e8 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -30,7 +30,7 @@ import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.network.{ListenerName, NetworkSend} +import org.apache.kafka.common.network.{ListenerName, NetworkSend, Send} import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader} @@ -98,7 +98,7 @@ class SocketServerTest extends JUnitSuite { byteBuffer.rewind() val send = new NetworkSend(request.connectionId, byteBuffer) - channel.sendResponse(new RequestChannel.Response(request, send)) + channel.sendResponse(RequestChannel.Response(request, send)) } def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = { @@ -173,13 +173,13 @@ class SocketServerTest extends JUnitSuite { val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) val serializedBytes = producerRequestBytes - for (i <- 0 until 10) + for (_ <- 0 until 10) sendRequest(plainSocket, serializedBytes) plainSocket.close() - for (i <- 0 until 10) { + for (_ <- 0 until 10) { val request = server.requestChannel.receiveRequest(2000) assertNotNull("receiveRequest timed out", request) - server.requestChannel.noOperation(request.processor, request) + server.requestChannel.sendResponse(RequestChannel.Response(request, None, RequestChannel.NoOpAction)) } } @@ -331,9 +331,9 @@ class SocketServerTest extends JUnitSuite { protocol: SecurityProtocol): Processor = { new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider) { - override protected[network] def sendResponse(response: RequestChannel.Response) { + override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) { conn.close() - super.sendResponse(response) + super.sendResponse(response, responseSend) } } } @@ -357,7 +357,7 @@ class SocketServerTest extends JUnitSuite { // detected. If the buffer is larger than 102400 bytes, a second write is attempted and it fails with an // IOException. val send = new NetworkSend(request.connectionId, ByteBuffer.allocate(550000)) - channel.sendResponse(new RequestChannel.Response(request, send)) + channel.sendResponse(RequestChannel.Response(request, send)) TestUtils.waitUntilTrue(() => totalTimeHistCount() == expectedTotalTimeCount, s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}")
