KAFKA-892 Change request log to include request completion not handling; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/988d4d8e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/988d4d8e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/988d4d8e Branch: refs/heads/trunk Commit: 988d4d8e65a14390abd748318a64e281e4a37c19 Parents: cae19aa Author: Neha Narkhede <neha.narkh...@gmail.com> Authored: Tue Apr 30 17:20:54 2013 -0700 Committer: Neha Narkhede <neha.narkh...@gmail.com> Committed: Tue Apr 30 17:20:54 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/network/RequestChannel.scala | 7 +++++-- core/src/main/scala/kafka/server/KafkaApis.scala | 4 +--- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/988d4d8e/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index c8f81c0..7b8d1f0 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,6 +26,7 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ +import org.apache.log4j.Logger object RequestChannel extends Logging { @@ -47,6 +48,7 @@ object RequestChannel extends Logging { val requestId = buffer.getShort() val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) buffer = null + private val requestLogger = Logger.getLogger("kafka.request.logger") trace("Received request : %s".format(requestObj)) def updateRequestMetrics() { @@ -76,8 +78,9 @@ object RequestChannel extends Logging { m.responseSendTimeHist.update(responseSendTime) m.totalTimeHist.update(totalTime) } - trace("Completed request : %s, totalTime:%d, queueTime:%d, localTime:%d, remoteTime:%d, sendTime:%d" - .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime)) + if(requestLogger.isTraceEnabled) + requestLogger.trace("Completed request:%s from client %s;totalTime:%d,queueTime:%d,localTime:%d,remoteTime:%d,sendTime:%d" + .format(requestObj, remoteAddress, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/988d4d8e/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bb178d6..f5288bf 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -46,7 +46,6 @@ class KafkaApis(val requestChannel: RequestChannel, new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) private val delayedRequestMetrics = new DelayedRequestMetrics - private val requestLogger = Logger.getLogger("kafka.request.logger") this.logIdent = "[KafkaApi-%d] ".format(brokerId) /** @@ -54,8 +53,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handle(request: RequestChannel.Request) { try{ - if(requestLogger.isTraceEnabled) - requestLogger.trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) + trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) request.requestId match { case RequestKeys.ProduceKey => handleProducerRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request)