Repository: kafka
Updated Branches:
  refs/heads/trunk 77fa0b116 -> 2885bc33d


KAFKA-3580; Improve error logging in ReplicaFetchThread

Author: Manikumar reddy O <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes #1237 from omkreddy/KAFKA-3580


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2885bc33
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2885bc33
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2885bc33

Branch: refs/heads/trunk
Commit: 2885bc33daaf75477bf39a92d1d1da02c0e03eaa
Parents: 77fa0b1
Author: Manikumar reddy O <[email protected]>
Authored: Wed Apr 27 06:53:16 2016 -0700
Committer: Ismael Juma <[email protected]>
Committed: Wed Apr 27 06:53:16 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/server/ReplicaFetcherThread.scala  | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2885bc33/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 84f2e12..d58f120 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -115,10 +115,10 @@ class ReplicaFetcherThread(name: String,
       val TopicAndPartition(topic, partitionId) = topicAndPartition
       val replica = replicaMgr.getReplica(topic, partitionId).get
       val messageSet = partitionData.toByteBufferMessageSet
-      warnIfMessageOversized(messageSet)
+      warnIfMessageOversized(messageSet, topicAndPartition)
 
       if (fetchOffset != replica.logEndOffset.messageOffset)
-        throw new RuntimeException("Offset mismatch: fetched offset = %d, log 
end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
+        throw new RuntimeException("Offset mismatch for partition %s: fetched 
offset = %d, log end offset = %d.".format(topicAndPartition, fetchOffset, 
replica.logEndOffset.messageOffset))
       if (logger.isTraceEnabled)
         trace("Follower %d has replica log end offset %d for partition %s. 
Received %d messages and leader hw %d"
           .format(replica.brokerId, replica.logEndOffset.messageOffset, 
topicAndPartition, messageSet.sizeInBytes, partitionData.highWatermark))
@@ -136,15 +136,15 @@ class ReplicaFetcherThread(name: String,
           .format(replica.brokerId, topic, partitionId, followerHighWatermark))
     } catch {
       case e: KafkaStorageException =>
-        fatal("Disk error while replicating data.", e)
+        fatal(s"Disk error while replicating data for $topicAndPartition", e)
         Runtime.getRuntime.halt(1)
     }
   }
 
-  def warnIfMessageOversized(messageSet: ByteBufferMessageSet): Unit = {
+  def warnIfMessageOversized(messageSet: ByteBufferMessageSet, 
topicAndPartition: TopicAndPartition): Unit = {
     if (messageSet.sizeInBytes > 0 && messageSet.validBytes <= 0)
-      error("Replication is failing due to a message that is greater than 
replica.fetch.max.bytes. This " +
-        "generally occurs when the max.message.bytes has been overridden to 
exceed this value and a suitably large " +
+      error(s"Replication is failing due to a message that is greater than 
replica.fetch.max.bytes for partition $topicAndPartition. " +
+        "This generally occurs when the max.message.bytes has been overridden 
to exceed this value and a suitably large " +
         "message has also been sent. To fix this problem increase 
replica.fetch.max.bytes in your broker config to be " +
         "equal or larger than your settings for max.message.bytes, both at a 
broker and topic level.")
   }

Reply via email to