git commit: KAFKA-842 Mirror maker can lose some messages during shutdown; reviewed by Jun Rao

2013-04-03 Thread nehanarkhede
Updated Branches:
  refs/heads/0.8 3c27988ca - bd262ac70


KAFKA-842 Mirror maker can lose some messages during shutdown; reviewed by Jun 
Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd262ac7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd262ac7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd262ac7

Branch: refs/heads/0.8
Commit: bd262ac708062e502406e8d775f4c9432a5364e7
Parents: 3c27988
Author: Neha Narkhede neha.narkh...@gmail.com
Authored: Wed Apr 3 13:43:50 2013 -0700
Committer: Neha Narkhede neha.narkh...@gmail.com
Committed: Wed Apr 3 13:43:50 2013 -0700

--
 .../main/scala/kafka/tools/KafkaMigrationTool.java |5 -
 1 files changed, 4 insertions(+), 1 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/bd262ac7/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
--
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java 
b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index a15b350..3c18286 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -385,8 +385,10 @@ public class KafkaMigrationTool {
   try{
 while(true) {
   KeyedMessagebyte[], byte[] data = 
producerDataChannel.receiveRequest();
-  if(!data.equals(shutdownMessage))
+  if(!data.equals(shutdownMessage)) {
 producer.send(data);
+if(logger.isDebugEnabled()) logger.debug(Sending message 
%s.format(new String(data.message(;
+  }
   else
 break;
 }
@@ -410,6 +412,7 @@ public class KafkaMigrationTool {
 public void awaitShutdown() {
   try {
 shutdownComplete.await();
+producer.close();
 logger.info(Producer thread  + threadName +  shutdown complete);
   } catch(InterruptedException ie) {
 logger.warn(Interrupt during shutdown of ProducerThread, ie);



git commit: kafka-846; AbstractFetcherThread should do shallow instead of deep iteration; patched by Jun Rao; reviewed by Neha Narkhede

2013-04-03 Thread junrao
Updated Branches:
  refs/heads/0.8 bd262ac70 - 5a50f7e55


kafka-846; AbstractFetcherThread should do shallow instead of deep iteration; 
patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a50f7e5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a50f7e5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a50f7e5

Branch: refs/heads/0.8
Commit: 5a50f7e10b8cde08cd588cf4b67b06484b16
Parents: bd262ac
Author: Jun Rao jun...@gmail.com
Authored: Wed Apr 3 17:15:06 2013 -0700
Committer: Jun Rao jun...@gmail.com
Committed: Wed Apr 3 17:15:06 2013 -0700

--
 .../scala/kafka/consumer/PartitionTopicInfo.scala  |   17 ++---
 .../scala/kafka/server/AbstractFetcherThread.scala |   28 +--
 2 files changed, 20 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/5a50f7e5/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
--
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala 
b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 9792244..64b702b 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -50,12 +50,12 @@ class PartitionTopicInfo(val topic: String,
   }
 
   /**
-   * Enqueue a message set for processing
+   * Enqueue a message set for processing.
*/
   def enqueue(messages: ByteBufferMessageSet) {
-val size = messages.sizeInBytes
+val size = messages.validBytes
 if(size  0) {
-  val next = nextOffset(messages)
+  val next = messages.shallowIterator.toSeq.last.nextOffset
   trace(Updating fetch offset =  + fetchedOffset.get +  to  + next)
   chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
   fetchedOffset.set(next)
@@ -65,17 +65,6 @@ class PartitionTopicInfo(val topic: String,
 }
   }
   
-  /**
-   * Get the next fetch offset after this message set
-   */
-  private def nextOffset(messages: ByteBufferMessageSet): Long = {
-var nextOffset = PartitionTopicInfo.InvalidOffset
-val iter = messages.shallowIterator
-while(iter.hasNext)
-  nextOffset = iter.next.nextOffset
-nextOffset
-  }
-
   override def toString(): String = topic + : + partitionId.toString + : 
fetched offset =  + fetchedOffset.get +
 : consumed offset =  + consumedOffset.get
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a50f7e5/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
--
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 087979f..cfa7747 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import kafka.cluster.Broker
-import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping}
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
 import kafka.message.MessageAndOffset
@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
 import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, 
FetchRequestBuilder}
+import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, 
ErrorMapping}
 
 
 /**
@@ -118,17 +118,23 @@ abstract class AbstractFetcherThread(name: String, 
clientId: String, sourceBroke
 if (currentOffset.isDefined  
fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {
   partitionData.error match {
 case ErrorMapping.NoError =
-  val messages = 
partitionData.messages.asInstanceOf[ByteBufferMessageSet]
-  val validBytes = messages.validBytes
-  val newOffset = messages.lastOption match {
-case Some(m: MessageAndOffset) = m.nextOffset
-case None = currentOffset.get
+  try {
+val messages = 
partitionData.messages.asInstanceOf[ByteBufferMessageSet]
+val validBytes = messages.validBytes
+val newOffset = messages.shallowIterator.toSeq.lastOption 
match {
+  case Some(m: MessageAndOffset) = m.nextOffset
+  case None = currentOffset.get
+}
+partitionMap.put(topicAndPartition, newOffset)
+fetcherLagStats.getFetcherLagStats(topic, partitionId).lag 
=