Repository: kafka Updated Branches: refs/heads/0.9.0 5b5f002fc -> 2a9869e91
KAFKA-3010; Include error in log when ack 0 I verified this by trying to produce to __consumer_offsets and the logged message looks like: [2015-12-22 10:34:40,897] INFO [KafkaApi-0] Closing connection due to error during produce request with correlation id 1 from client id console-producer with ack=0 Topic and partition to exceptions: [__consumer_offsets,43] -> kafka.common.InvalidTopicException (kafka.server.KafkaApis) Author: Ismael Juma <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #709 from ijuma/kafka-3010-include-error-in-log-when-ack-0 (cherry picked from commit 5db147c1d453c9dabcc277bd95435f17201b9c1c) Signed-off-by: Jun Rao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a9869e9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a9869e9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a9869e9 Branch: refs/heads/0.9.0 Commit: 2a9869e91362ad8e28d54382592dde14583dcba8 Parents: 5b5f002 Author: Ismael Juma <[email protected]> Authored: Tue Jan 5 11:39:56 2016 -0800 Committer: Jun Rao <[email protected]> Committed: Tue Jan 5 11:40:04 2016 -0800 ---------------------------------------------------------------------- .../src/main/scala/kafka/server/KafkaApis.scala | 31 +++++++++----------- 1 file changed, 14 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2a9869e9/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ade879b..d30bc2d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -208,10 +208,7 @@ class KafkaApis(val requestChannel: RequestChannel, val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.TopicAuthorizationCode) mergedCommitStatus.foreach { case (topicAndPartition, errorCode) => - // we only print warnings for known errors here; only replica manager could see an unknown - // exception while trying to write the offset message to the local log, and it will log - // an error message and write the error code in this case; hence it can be ignored here - if (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.UnknownCode) { + if (errorCode != ErrorMapping.NoError) { debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s" .format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(errorCode))) @@ -305,19 +302,19 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { - var errorInResponse = false + val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1)) + var errorInResponse = false + mergedResponseStatus.foreach { case (topicAndPartition, status) => - // we only print warnings for known errors here; if it is unknown, it will cause - // an error message in the replica manager - if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { + if (status.error != ErrorMapping.NoError) { + errorInResponse = true debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( produceRequest.correlationId, produceRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) - errorInResponse = true } } @@ -328,10 +325,14 @@ class KafkaApis(val requestChannel: RequestChannel, // the request, since no response is expected by the producer, the server will close socket server so that // the producer client will know that some error has happened and will refresh its metadata if (errorInResponse) { + val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) => + topicAndPartition -> ErrorMapping.exceptionNameFor(status.error) + }.mkString(", ") info( - "Close connection due to error handling produce request with correlation id %d from client id %s with ack=0".format( - produceRequest.correlationId, - produceRequest.clientId)) + s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " + + s"from client id ${produceRequest.clientId} with ack=0\n" + + s"Topic and partition to exceptions: $exceptionsSummary" + ) requestChannel.closeConnection(request.processor, request) } else { requestChannel.noOperation(request.processor, request) @@ -358,8 +359,6 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedRequestInfo.isEmpty) sendResponseCallback(Map.empty) else { - // only allow appending to internal topic partitions - // if the client is not from admin val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId // call the replica manager to append messages to the replicas @@ -394,9 +393,7 @@ class KafkaApis(val requestChannel: RequestChannel, val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatus mergedResponseStatus.foreach { case (topicAndPartition, data) => - // we only print warnings for known errors here; if it is unknown, it will cause - // an error message in the replica manager already and hence can be ignored here - if (data.error != ErrorMapping.NoError && data.error != ErrorMapping.UnknownCode) { + if (data.error != ErrorMapping.NoError) { debug("Fetch request with correlation id %d from client %s on partition %s failed due to %s" .format(fetchRequest.correlationId, fetchRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(data.error)))
