kafka-861; IndexOutOfBoundsException while fetching data from leader; patched by Sriram Subramanian; reviewed by Jun Rao and Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b29b6296 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b29b6296 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b29b6296 Branch: refs/heads/trunk Commit: b29b6296ac5b799a78d61920a3f4131051230a02 Parents: 158baf6 Author: Sriram Subramanian <[email protected]> Authored: Mon Apr 15 18:09:57 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Mon Apr 15 18:09:57 2013 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/message/ByteBufferMessageSet.scala | 2 +- .../main/scala/kafka/server/AbstractFetcherThread.scala | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b29b6296/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 03590ad..078ebb4 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -154,7 +154,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message return allDone() val offset = topIter.getLong() val size = topIter.getInt() - if(size < 0) + if(size < Message.MinHeaderSize) throw new InvalidMessageException("Message found with corrupt size (" + size + ")") // we have an incomplete message http://git-wip-us.apache.org/repos/asf/kafka/blob/b29b6296/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index b6845e4..fed3b86 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -19,8 +19,7 @@ package kafka.server import kafka.cluster.Broker import collection.mutable -import kafka.message.ByteBufferMessageSet -import kafka.message.MessageAndOffset +import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicLong @@ -131,8 +130,14 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread processPartitionData(topicAndPartition, currentOffset.get, partitionData) } catch { + case ime: InvalidMessageException => + // we log the error and continue. This ensures two things + // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag + // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and + // should get fixed in the subsequent fetches + logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) case e => - throw new KafkaException("error processing data for topic %s partititon %d offset %d" + throw new KafkaException("error processing data for partition [%s,%d] offset %d" .format(topic, partitionId, currentOffset.get), e) } case ErrorMapping.OffsetOutOfRangeCode =>
