kafka-861; IndexOutOfBoundsException while fetching data from leader; patched 
by Sriram Subramanian; reviewed by Jun Rao and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b29b6296
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b29b6296
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b29b6296

Branch: refs/heads/trunk
Commit: b29b6296ac5b799a78d61920a3f4131051230a02
Parents: 158baf6
Author: Sriram Subramanian <[email protected]>
Authored: Mon Apr 15 18:09:57 2013 -0700
Committer: Jun Rao <[email protected]>
Committed: Mon Apr 15 18:09:57 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/message/ByteBufferMessageSet.scala  |  2 +-
 .../main/scala/kafka/server/AbstractFetcherThread.scala  | 11 ++++++++---
 2 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b29b6296/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 03590ad..078ebb4 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -154,7 +154,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: 
ByteBuffer) extends Message
           return allDone()
         val offset = topIter.getLong()
         val size = topIter.getInt()
-        if(size < 0)
+        if(size < Message.MinHeaderSize)
           throw new InvalidMessageException("Message found with corrupt size 
(" + size + ")")
         
         // we have an incomplete message

http://git-wip-us.apache.org/repos/asf/kafka/blob/b29b6296/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b6845e4..fed3b86 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -19,8 +19,7 @@ package kafka.server
 
 import kafka.cluster.Broker
 import collection.mutable
-import kafka.message.ByteBufferMessageSet
-import kafka.message.MessageAndOffset
+import kafka.message.{InvalidMessageException, ByteBufferMessageSet, 
MessageAndOffset}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
@@ -131,8 +130,14 @@ abstract class AbstractFetcherThread(name: String, 
clientId: String, sourceBroke
                     // Once we hand off the partition data to the subclass, we 
can't mess with it any more in this thread
                     processPartitionData(topicAndPartition, currentOffset.get, 
partitionData)
                   } catch {
+                    case ime: InvalidMessageException =>
+                      // we log the error and continue. This ensures two things
+                      // 1. If there is a corrupt message in a topic 
partition, it does not bring the fetcher thread down and cause other topic 
partition to also lag
+                      // 2. If the message is corrupt due to a transient state 
in the log (truncation, partial writes can cause this), we simply continue and
+                      //    should get fixed in the subsequent fetches
+                      logger.warn("Found invalid messages during fetch for 
partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " 
error " + ime.getMessage)
                     case e =>
-                      throw new KafkaException("error processing data for 
topic %s partititon %d offset %d"
+                      throw new KafkaException("error processing data for 
partition [%s,%d] offset %d"
                                                .format(topic, partitionId, 
currentOffset.get), e)
                   }
                 case ErrorMapping.OffsetOutOfRangeCode =>

Reply via email to