Repository: kafka Updated Branches: refs/heads/0.11.0 e08e76b71 -> 4c29b1544
KAFKA-5303, KAFKA-5305: Improve logging when fetches fail in ReplicaFetcherThread Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> Closes #3115 from ijuma/kafka-5305-missing-log-info-replica-fetcher (cherry picked from commit c2ced5fb51f2a4ec94d158e18836afb7284d26ce) Signed-off-by: Rajini Sivaram <rajinisiva...@googlemail.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c29b154 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c29b154 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c29b154 Branch: refs/heads/0.11.0 Commit: 4c29b1544bb058a004c57c3e559c7458810df2b4 Parents: e08e76b Author: Ismael Juma <ism...@juma.me.uk> Authored: Mon May 22 16:08:06 2017 +0100 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Mon May 22 16:08:37 2017 +0100 ---------------------------------------------------------------------- core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala | 3 ++- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 4 ++-- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 3 +++ 3 files changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4c29b154/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index ec60220..394132b 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -128,6 +128,7 @@ object ConsumerFetcherThread { }.toMap def isEmpty: Boolean = underlying.requestInfo.isEmpty def offset(topicPartition: TopicPartition): Long = tpToOffset(topicPartition) + override def toString = underlying.toString } class PartitionData(val underlying: FetchResponsePartitionData) extends AbstractFetcherThread.PartitionData { @@ -136,6 +137,6 @@ object ConsumerFetcherThread { def highWatermark: Long = underlying.hw def exception: Option[Throwable] = if (error == Errors.NONE) None else Some(ErrorMapping.exceptionFor(error.code)) - + override def toString = underlying.toString } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4c29b154/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 734c006..cb0680c 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -147,12 +147,12 @@ abstract class AbstractFetcherThread(name: String, var responseData: Seq[(TopicPartition, PD)] = Seq.empty try { - trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) + trace(s"Issuing fetch to broker ${sourceBroker.id}, request: $fetchRequest") responseData = fetch(fetchRequest) } catch { case t: Throwable => if (isRunning.get) { - warn(s"Error in fetch $fetchRequest", t) + warn(s"Error in fetch to broker ${sourceBroker.id}, request ${fetchRequest}", t) inLock(partitionMapLock) { partitionStates.partitionSet.asScala.foreach(updatePartitionsWithError) // there is an error occurred while fetching partitions, sleep a while http://git-wip-us.apache.org/repos/asf/kafka/blob/4c29b154/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index bd678b3..d7420dd 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -349,6 +349,7 @@ object ReplicaFetcherThread { def isEmpty: Boolean = underlying.fetchData().isEmpty def offset(topicPartition: TopicPartition): Long = underlying.fetchData().asScala(topicPartition).fetchOffset + override def toString = underlying.toString } private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData { @@ -367,5 +368,7 @@ object ReplicaFetcherThread { case Errors.NONE => None case e => Some(e.exception) } + + override def toString = underlying.toString } }