Repository: kafka
Updated Branches:
  refs/heads/trunk ceb10c533 -> c2ced5fb5


KAFKA-5303, KAFKA-5305: Improve logging when fetches fail in 
ReplicaFetcherThread

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>

Closes #3115 from ijuma/kafka-5305-missing-log-info-replica-fetcher


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c2ced5fb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c2ced5fb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c2ced5fb

Branch: refs/heads/trunk
Commit: c2ced5fb51f2a4ec94d158e18836afb7284d26ce
Parents: ceb10c5
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Mon May 22 16:08:06 2017 +0100
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Mon May 22 16:08:06 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala | 3 ++-
 core/src/main/scala/kafka/server/AbstractFetcherThread.scala   | 4 ++--
 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala    | 3 +++
 3 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c2ced5fb/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index ec60220..394132b 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -128,6 +128,7 @@ object ConsumerFetcherThread {
     }.toMap
     def isEmpty: Boolean = underlying.requestInfo.isEmpty
     def offset(topicPartition: TopicPartition): Long = 
tpToOffset(topicPartition)
+    override def toString = underlying.toString
   }
 
   class PartitionData(val underlying: FetchResponsePartitionData) extends 
AbstractFetcherThread.PartitionData {
@@ -136,6 +137,6 @@ object ConsumerFetcherThread {
     def highWatermark: Long = underlying.hw
     def exception: Option[Throwable] =
       if (error == Errors.NONE) None else 
Some(ErrorMapping.exceptionFor(error.code))
-
+    override def toString = underlying.toString
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2ced5fb/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 734c006..cb0680c 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -147,12 +147,12 @@ abstract class AbstractFetcherThread(name: String,
     var responseData: Seq[(TopicPartition, PD)] = Seq.empty
 
     try {
-      trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, 
fetchRequest))
+      trace(s"Issuing fetch to broker ${sourceBroker.id}, request: 
$fetchRequest")
       responseData = fetch(fetchRequest)
     } catch {
       case t: Throwable =>
         if (isRunning.get) {
-          warn(s"Error in fetch $fetchRequest", t)
+          warn(s"Error in fetch to broker ${sourceBroker.id}, request 
${fetchRequest}", t)
           inLock(partitionMapLock) {
             
partitionStates.partitionSet.asScala.foreach(updatePartitionsWithError)
             // there is an error occurred while fetching partitions, sleep a 
while

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2ced5fb/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index bd678b3..d7420dd 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -349,6 +349,7 @@ object ReplicaFetcherThread {
     def isEmpty: Boolean = underlying.fetchData().isEmpty
     def offset(topicPartition: TopicPartition): Long =
       underlying.fetchData().asScala(topicPartition).fetchOffset
+    override def toString = underlying.toString
   }
 
   private[server] class PartitionData(val underlying: 
FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
@@ -367,5 +368,7 @@ object ReplicaFetcherThread {
       case Errors.NONE => None
       case e => Some(e.exception)
     }
+
+    override def toString = underlying.toString
   }
 }

Reply via email to