Incorrect index in the log of a follower; patched by Jun Rao; reviewed by Neha 
Narkhede and Jay Kreps; kafka-804


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/46ebdc16
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/46ebdc16
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/46ebdc16

Branch: refs/heads/trunk
Commit: 46ebdc16e99c4bb9236a6c25c47ff2c97fdc0f53
Parents: d40d255
Author: Jun Rao <jun...@gmail.com>
Authored: Mon Mar 18 15:53:03 2013 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Mon Mar 18 15:53:03 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala          |   30 ++++++++--------
 core/src/test/scala/unit/kafka/log/LogTest.scala |    8 ++--
 2 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/46ebdc16/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index f6ee475..7d71451 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -270,33 +270,32 @@ private[kafka] class Log(val dir: File,
 
       try {
         // they are valid, insert them in the log
-        val offsetsAndNumAppendedMessages = lock synchronized {
+        val offsets = lock synchronized {
           val firstOffset = nextOffset.get
 
           // maybe roll the log if this segment is full
           val segment = maybeRoll(segments.view.last)
           
           // assign offsets to the messageset
-          val offsets = 
+          val lastOffset =
             if(assignOffsets) {
               val offsetCounter = new AtomicLong(nextOffset.get)
-              val firstOffset = offsetCounter.get
               try {
                 validMessages = validMessages.assignOffsets(offsetCounter, 
messageSetInfo.codec)
               } catch {
                 case e: IOException => throw new KafkaException("Error in 
validating messages while appending to log '%s'".format(name), e)
               }
-              val lastOffset = offsetCounter.get - 1
-              val numMessages = lastOffset - firstOffset + 1
+              val assignedLastOffset = offsetCounter.get - 1
+              val numMessages = assignedLastOffset - firstOffset + 1
               
BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(numMessages)
               
BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages)
-              (firstOffset, lastOffset)
+              assignedLastOffset
             } else {
               require(messageSetInfo.offsetsMonotonic, "Out of order offsets 
found in " + messages)
               require(messageSetInfo.firstOffset >= nextOffset.get, 
                       "Attempt to append a message set beginning with offset 
%d to a log with log end offset %d."
                       .format(messageSetInfo.firstOffset, nextOffset.get))
-              (messageSetInfo.firstOffset, messageSetInfo.lastOffset)
+              messageSetInfo.lastOffset
             }
 
           // Check if the message sizes are valid. This check is done after 
assigning offsets to ensure the comparison
@@ -308,23 +307,24 @@ private[kafka] class Log(val dir: File,
           }
 
           // now append to the log
-          trace("Appending message set to %s offset: %d nextOffset: %d 
messageSet: %s"
-                .format(this.name, offsets._1, nextOffset.get(), 
validMessages))
-          segment.append(offsets._1, validMessages)
+          segment.append(firstOffset, validMessages)
           
           // advance the log end offset
-          nextOffset.set(offsets._2 + 1)
-          val numAppendedMessages = (nextOffset.get - firstOffset).toInt
+          nextOffset.set(lastOffset + 1)
+
+          trace("Appended message set to log %s with first offset: %d, next 
offset: %d, and messages: %s"
+                  .format(this.name, firstOffset, nextOffset.get(), 
validMessages))
 
           // return the offset at which the messages were appended
-          (offsets._1, offsets._2, numAppendedMessages)
+          (firstOffset, lastOffset)
         }
         
         // maybe flush the log and index
-        maybeFlush(offsetsAndNumAppendedMessages._3)
+        val numAppendedMessages = (offsets._2 - offsets._1 + 1).toInt
+        maybeFlush(numAppendedMessages)
         
         // return the first and last offset
-        (offsetsAndNumAppendedMessages._1, offsetsAndNumAppendedMessages._2)
+        offsets
       } catch {
         case e: IOException => throw new KafkaStorageException("I/O exception 
in append to log '%s'".format(name), e)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/46ebdc16/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 786ae03..4ed88e8 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -379,15 +379,15 @@ class LogTest extends JUnitSuite {
                         needsRecovery = true)
       val messages = List("one", "two", "three", "four", "five", "six")
       val ms = new ByteBufferMessageSet(compressionCodec = codec, 
-                                        offsetCounter = new AtomicLong(5), 
+                                        offsetCounter = new AtomicLong(0),
                                         messages = messages.map(s => new 
Message(s.getBytes)):_*)
-      val firstOffset = ms.shallowIterator.toList.head.offset
-      val lastOffset = ms.shallowIterator.toList.last.offset
+      val firstOffset = ms.toList.head.offset
+      val lastOffset = ms.toList.last.offset
       val (first, last) = log.append(ms, assignOffsets = false)
       assertEquals(last + 1, log.logEndOffset)
       assertEquals(firstOffset, first)
       assertEquals(lastOffset, last)
-      assertTrue(log.read(5, 64*1024).size > 0)
+      assertTrue(log.read(0, 64*1024).size > 0)
       log.delete()
     }
   }

Reply via email to