Updated Branches:
refs/heads/0.8 bd262ac70 - 5a50f7e55
kafka-846; AbstractFetcherThread should do shallow instead of deep iteration;
patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a50f7e5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a50f7e5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a50f7e5
Branch: refs/heads/0.8
Commit: 5a50f7e10b8cde08cd588cf4b67b06484b16
Parents: bd262ac
Author: Jun Rao jun...@gmail.com
Authored: Wed Apr 3 17:15:06 2013 -0700
Committer: Jun Rao jun...@gmail.com
Committed: Wed Apr 3 17:15:06 2013 -0700
--
.../scala/kafka/consumer/PartitionTopicInfo.scala | 17 ++---
.../scala/kafka/server/AbstractFetcherThread.scala | 28 +--
2 files changed, 20 insertions(+), 25 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a50f7e5/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
--
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 9792244..64b702b 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -50,12 +50,12 @@ class PartitionTopicInfo(val topic: String,
}
/**
- * Enqueue a message set for processing
+ * Enqueue a message set for processing.
*/
def enqueue(messages: ByteBufferMessageSet) {
-val size = messages.sizeInBytes
+val size = messages.validBytes
if(size 0) {
- val next = nextOffset(messages)
+ val next = messages.shallowIterator.toSeq.last.nextOffset
trace(Updating fetch offset = + fetchedOffset.get + to + next)
chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
fetchedOffset.set(next)
@@ -65,17 +65,6 @@ class PartitionTopicInfo(val topic: String,
}
}
- /**
- * Get the next fetch offset after this message set
- */
- private def nextOffset(messages: ByteBufferMessageSet): Long = {
-var nextOffset = PartitionTopicInfo.InvalidOffset
-val iter = messages.shallowIterator
-while(iter.hasNext)
- nextOffset = iter.next.nextOffset
-nextOffset
- }
-
override def toString(): String = topic + : + partitionId.toString + :
fetched offset = + fetchedOffset.get +
: consumed offset = + consumedOffset.get
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a50f7e5/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
--
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 087979f..cfa7747 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,7 +18,6 @@
package kafka.server
import kafka.cluster.Broker
-import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping}
import collection.mutable
import kafka.message.ByteBufferMessageSet
import kafka.message.MessageAndOffset
@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData,
FetchRequestBuilder}
+import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition,
ErrorMapping}
/**
@@ -118,17 +118,23 @@ abstract class AbstractFetcherThread(name: String,
clientId: String, sourceBroke
if (currentOffset.isDefined
fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {
partitionData.error match {
case ErrorMapping.NoError =
- val messages =
partitionData.messages.asInstanceOf[ByteBufferMessageSet]
- val validBytes = messages.validBytes
- val newOffset = messages.lastOption match {
-case Some(m: MessageAndOffset) = m.nextOffset
-case None = currentOffset.get
+ try {
+val messages =
partitionData.messages.asInstanceOf[ByteBufferMessageSet]
+val validBytes = messages.validBytes
+val newOffset = messages.shallowIterator.toSeq.lastOption
match {
+ case Some(m: MessageAndOffset) = m.nextOffset
+ case None = currentOffset.get
+}
+partitionMap.put(topicAndPartition, newOffset)
+fetcherLagStats.getFetcherLagStats(topic, partitionId).lag
=