Updated Branches: refs/heads/0.8 bd262ac70 -> 5a50f7e55
kafka-846; AbstractFetcherThread should do shallow instead of deep iteration; patched by Jun Rao; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a50f7e5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a50f7e5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a50f7e5 Branch: refs/heads/0.8 Commit: 5a50f7e555510b8cde08cd588cf4b67b06484b16 Parents: bd262ac Author: Jun Rao <jun...@gmail.com> Authored: Wed Apr 3 17:15:06 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Apr 3 17:15:06 2013 -0700 ---------------------------------------------------------------------- .../scala/kafka/consumer/PartitionTopicInfo.scala | 17 ++------- .../scala/kafka/server/AbstractFetcherThread.scala | 28 +++++++++------ 2 files changed, 20 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5a50f7e5/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala index 9792244..64b702b 100644 --- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala +++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala @@ -50,12 +50,12 @@ class PartitionTopicInfo(val topic: String, } /** - * Enqueue a message set for processing + * Enqueue a message set for processing. */ def enqueue(messages: ByteBufferMessageSet) { - val size = messages.sizeInBytes + val size = messages.validBytes if(size > 0) { - val next = nextOffset(messages) + val next = messages.shallowIterator.toSeq.last.nextOffset trace("Updating fetch offset = " + fetchedOffset.get + " to " + next) chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) fetchedOffset.set(next) @@ -65,17 +65,6 @@ class PartitionTopicInfo(val topic: String, } } - /** - * Get the next fetch offset after this message set - */ - private def nextOffset(messages: ByteBufferMessageSet): Long = { - var nextOffset = PartitionTopicInfo.InvalidOffset - val iter = messages.shallowIterator - while(iter.hasNext) - nextOffset = iter.next.nextOffset - nextOffset - } - override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get + ": consumed offset = " + consumedOffset.get } http://git-wip-us.apache.org/repos/asf/kafka/blob/5a50f7e5/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 087979f..cfa7747 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -18,7 +18,6 @@ package kafka.server import kafka.cluster.Broker -import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping} import collection.mutable import kafka.message.ByteBufferMessageSet import kafka.message.MessageAndOffset @@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} +import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} /** @@ -118,17 +118,23 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) { partitionData.error match { case ErrorMapping.NoError => - val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - val validBytes = messages.validBytes - val newOffset = messages.lastOption match { - case Some(m: MessageAndOffset) => m.nextOffset - case None => currentOffset.get + try { + val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] + val validBytes = messages.validBytes + val newOffset = messages.shallowIterator.toSeq.lastOption match { + case Some(m: MessageAndOffset) => m.nextOffset + case None => currentOffset.get + } + partitionMap.put(topicAndPartition, newOffset) + fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset + fetcherStats.byteRate.mark(validBytes) + // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread + processPartitionData(topicAndPartition, currentOffset.get, partitionData) + } catch { + case e => + throw new KafkaException("error processing data for topic %s partititon %d offset %d" + .format(topic, partitionId, currentOffset.get), e) } - partitionMap.put(topicAndPartition, newOffset) - fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset - fetcherStats.byteRate.mark(validBytes) - // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread - processPartitionData(topicAndPartition, currentOffset.get, partitionData) case ErrorMapping.OffsetOutOfRangeCode => try { val newOffset = handleOffsetOutOfRange(topicAndPartition)