kafka-649; Cleanup log4j logging (extra); patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e4f287db Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e4f287db Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e4f287db Branch: refs/heads/trunk Commit: e4f287db6142cafdf1ddb4ebf7110180ec06f7c4 Parents: 312ed2e Author: Jun Rao <jun...@gmail.com> Authored: Wed May 29 10:00:51 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Wed May 29 10:00:51 2013 -0700 ---------------------------------------------------------------------- .../controller/PartitionLeaderSelector.scala | 7 ++++--- .../src/main/scala/kafka/server/KafkaApis.scala | 22 +++++++++++++------- core/src/main/scala/kafka/utils/ZkUtils.scala | 8 +++---- 3 files changed, 22 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e4f287db/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 21b0e24..a47b142 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -63,13 +63,14 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten case false => ControllerStats.uncleanLeaderElectionRate.mark() val newLeader = liveAssignedReplicasToThisPartition.head - warn("No broker in ISR is alive for %s. Elect leader from broker %s. There's potential data loss." - .format(topicAndPartition, newLeader)) + warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." + .format(topicAndPartition, newLeader, liveAssignedReplicasToThisPartition.mkString(","))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) } case false => val newLeader = liveBrokersInIsr.head - debug("Some broker in ISR is alive for %s. Select %d from ISR to be the leader.".format(topicAndPartition, newLeader)) + debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." + .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) } info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) http://git-wip-us.apache.org/repos/asf/kafka/blob/e4f287db/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 93e2f04..dd88ccd 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -239,16 +239,18 @@ class KafkaApis(val requestChannel: RequestChannel, Runtime.getRuntime.halt(1) null case utpe: UnknownTopicOrPartitionException => - warn("Produce request: " + utpe.getMessage) + warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( + producerRequest.correlationId, producerRequest.clientId, topicAndPartition, utpe.getMessage)) new ProduceResult(topicAndPartition, utpe) case nle: NotLeaderForPartitionException => - warn("Produce request: " + nle.getMessage) + warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( + producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage)) new ProduceResult(topicAndPartition, nle) case e => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() - error("Error processing ProducerRequest with correlation id %d from client %s on %s:%d" - .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition.topic, topicAndPartition.partition), e) + error("Error processing ProducerRequest with correlation id %d from client %s on partition %s" + .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition), e) new ProduceResult(topicAndPartition, e) } } @@ -326,10 +328,12 @@ class KafkaApis(val requestChannel: RequestChannel, // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request // for a partition it is the leader for case utpe: UnknownTopicOrPartitionException => - warn("Fetch request: " + utpe.getMessage) + warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( + fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage)) new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) case nle: NotLeaderForPartitionException => - warn("Fetch request: " + nle.getMessage) + warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( + fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) case t => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() @@ -402,10 +406,12 @@ class KafkaApis(val requestChannel: RequestChannel, // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages // are typically transient and there is no value in logging the entire stack trace for the same case utpe: UnknownTopicOrPartitionException => - warn(utpe.getMessage) + warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition, utpe.getMessage)) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) ) case nle: NotLeaderForPartitionException => - warn(nle.getMessage) + warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage)) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) ) case e => warn("Error while responding to offset request", e) http://git-wip-us.apache.org/repos/asf/kafka/blob/e4f287db/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 4f6fcd4..3775eb4 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -333,8 +333,8 @@ object ZkUtils extends Logging { (true, stat.getVersion) } catch { case e: Exception => - error("Conditional update of path %s with data %s and expected version %d failed".format(path, data, - expectVersion), e) + error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, + expectVersion, e.getMessage)) (false, -1) } } @@ -352,8 +352,8 @@ object ZkUtils extends Logging { } catch { case nne: ZkNoNodeException => throw nne case e: Exception => - error("Conditional update of path %s with data %s and expected version %d failed".format(path, data, - expectVersion), e) + error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, + expectVersion, e.getMessage)) (false, -1) } }