Repository: kafka Updated Branches: refs/heads/trunk 23d607dc2 -> 5db147c1d
KAFKA-3010; Include error in log when ack 0 I verified this by trying to produce to __consumer_offsets and the logged message looks like: [2015-12-22 10:34:40,897] INFO [KafkaApi-0] Closing connection due to error during produce request with correlation id 1 from client id console-producer with ack=0 Topic and partition to exceptions: [__consumer_offsets,43] -> kafka.common.InvalidTopicException (kafka.server.KafkaApis) Author: Ismael Juma <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #709 from ijuma/kafka-3010-include-error-in-log-when-ack-0 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5db147c1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5db147c1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5db147c1 Branch: refs/heads/trunk Commit: 5db147c1d453c9dabcc277bd95435f17201b9c1c Parents: 23d607d Author: Ismael Juma <[email protected]> Authored: Tue Jan 5 11:39:56 2016 -0800 Committer: Jun Rao <[email protected]> Committed: Tue Jan 5 11:39:56 2016 -0800 ---------------------------------------------------------------------- .../src/main/scala/kafka/server/KafkaApis.scala | 31 +++++++++----------- 1 file changed, 14 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5db147c1/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8589400..cbf5031 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -218,10 +218,7 @@ class KafkaApis(val requestChannel: RequestChannel, val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.TopicAuthorizationCode) mergedCommitStatus.foreach { case (topicAndPartition, errorCode) => - // we only print warnings for known errors here; only replica manager could see an unknown - // exception while trying to write the offset message to the local log, and it will log - // an error message and write the error code in this case; hence it can be ignored here - if (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.UnknownCode) { + if (errorCode != ErrorMapping.NoError) { debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s" .format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(errorCode))) @@ -315,19 +312,19 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { - var errorInResponse = false + val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1)) + var errorInResponse = false + mergedResponseStatus.foreach { case (topicAndPartition, status) => - // we only print warnings for known errors here; if it is unknown, it will cause - // an error message in the replica manager - if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { + if (status.error != ErrorMapping.NoError) { + errorInResponse = true debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( produceRequest.correlationId, produceRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) - errorInResponse = true } } @@ -338,10 +335,14 @@ class KafkaApis(val requestChannel: RequestChannel, // the request, since no response is expected by the producer, the server will close socket server so that // the producer client will know that some error has happened and will refresh its metadata if (errorInResponse) { + val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) => + topicAndPartition -> ErrorMapping.exceptionNameFor(status.error) + }.mkString(", ") info( - "Close connection due to error handling produce request with correlation id %d from client id %s with ack=0".format( - produceRequest.correlationId, - produceRequest.clientId)) + s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " + + s"from client id ${produceRequest.clientId} with ack=0\n" + + s"Topic and partition to exceptions: $exceptionsSummary" + ) requestChannel.closeConnection(request.processor, request) } else { requestChannel.noOperation(request.processor, request) @@ -368,8 +369,6 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedRequestInfo.isEmpty) sendResponseCallback(Map.empty) else { - // only allow appending to internal topic partitions - // if the client is not from admin val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId // call the replica manager to append messages to the replicas @@ -404,9 +403,7 @@ class KafkaApis(val requestChannel: RequestChannel, val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatus mergedResponseStatus.foreach { case (topicAndPartition, data) => - // we only print warnings for known errors here; if it is unknown, it will cause - // an error message in the replica manager already and hence can be ignored here - if (data.error != ErrorMapping.NoError && data.error != ErrorMapping.UnknownCode) { + if (data.error != ErrorMapping.NoError) { debug("Fetch request with correlation id %d from client %s on partition %s failed due to %s" .format(fetchRequest.correlationId, fetchRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(data.error)))
