Repository: kafka Updated Branches: refs/heads/trunk 77fa0b116 -> 2885bc33d
KAFKA-3580; Improve error logging in ReplicaFetchThread Author: Manikumar reddy O <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1237 from omkreddy/KAFKA-3580 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2885bc33 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2885bc33 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2885bc33 Branch: refs/heads/trunk Commit: 2885bc33daaf75477bf39a92d1d1da02c0e03eaa Parents: 77fa0b1 Author: Manikumar reddy O <[email protected]> Authored: Wed Apr 27 06:53:16 2016 -0700 Committer: Ismael Juma <[email protected]> Committed: Wed Apr 27 06:53:16 2016 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/server/ReplicaFetcherThread.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2885bc33/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 84f2e12..d58f120 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -115,10 +115,10 @@ class ReplicaFetcherThread(name: String, val TopicAndPartition(topic, partitionId) = topicAndPartition val replica = replicaMgr.getReplica(topic, partitionId).get val messageSet = partitionData.toByteBufferMessageSet - warnIfMessageOversized(messageSet) + warnIfMessageOversized(messageSet, topicAndPartition) if (fetchOffset != replica.logEndOffset.messageOffset) - throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset)) + throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicAndPartition, fetchOffset, replica.logEndOffset.messageOffset)) if (logger.isTraceEnabled) trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d" .format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.highWatermark)) @@ -136,15 +136,15 @@ class ReplicaFetcherThread(name: String, .format(replica.brokerId, topic, partitionId, followerHighWatermark)) } catch { case e: KafkaStorageException => - fatal("Disk error while replicating data.", e) + fatal(s"Disk error while replicating data for $topicAndPartition", e) Runtime.getRuntime.halt(1) } } - def warnIfMessageOversized(messageSet: ByteBufferMessageSet): Unit = { + def warnIfMessageOversized(messageSet: ByteBufferMessageSet, topicAndPartition: TopicAndPartition): Unit = { if (messageSet.sizeInBytes > 0 && messageSet.validBytes <= 0) - error("Replication is failing due to a message that is greater than replica.fetch.max.bytes. This " + - "generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " + + error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicAndPartition. " + + "This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " + "message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " + "equal or larger than your settings for max.message.bytes, both at a broker and topic level.") }
