Repository: kafka
Updated Branches:
  refs/heads/trunk 693d4ca1a -> f1110c3fb


KAFKA-2477: Fix a race condition between log append and fetch that causes 
OffsetOutOfRangeException.

Tried two fixes. I prefer the second approach because it saves an additional 
offset search.

Author: Jiangjie Qin <becket....@gmail.com>

Reviewers: Guozhang Wang <wangg...@gmail.com>, Ismael Juma <ism...@juma.me.uk>, 
Jun Rao <jun...@gmail.com>

Closes #204 from becketqin/KAFKA-2477


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f1110c3f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f1110c3f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f1110c3f

Branch: refs/heads/trunk
Commit: f1110c3fbb166f94204b6bb18bc4e1a9100d3c4e
Parents: 693d4ca
Author: Jiangjie Qin <becket....@gmail.com>
Authored: Wed Oct 7 21:59:14 2015 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Wed Oct 7 21:59:14 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala        | 39 ++++++++++++++++-----
 core/src/main/scala/kafka/log/LogSegment.scala |  9 ++---
 2 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f1110c3f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index e5e8007..02205c9 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -470,22 +470,41 @@ class Log(val dir: File,
   def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): 
FetchDataInfo = {
     trace("Reading %d bytes from offset %d in log %s of length %d 
bytes".format(maxLength, startOffset, name, size))
 
-    // check if the offset is valid and in range
-    val next = nextOffsetMetadata.messageOffset
+    // Because we don't use lock for reading, the synchronization is a little 
bit tricky.
+    // We create the local variables to avoid race conditions with updates to 
the log.
+    val currentNextOffsetMetadata = nextOffsetMetadata
+    val next = currentNextOffsetMetadata.messageOffset
     if(startOffset == next)
-      return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
-    
+      return FetchDataInfo(currentNextOffsetMetadata, MessageSet.Empty)
+
     var entry = segments.floorEntry(startOffset)
-      
+
     // attempt to read beyond the log end offset is an error
     if(startOffset > next || entry == null)
       throw new OffsetOutOfRangeException("Request for offset %d but we only 
have log segments in the range %d to %d.".format(startOffset, 
segments.firstKey, next))
     
-    // do the read on the segment with a base offset less than the target 
offset
+    // Do the read on the segment with a base offset less than the target 
offset
     // but if that segment doesn't contain any messages with an offset greater 
than that
     // continue to read from successive segments until we get some messages or 
we reach the end of the log
     while(entry != null) {
-      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)
+      // If the fetch occurs on the active segment, there might be a race 
condition where two fetch requests occur after
+      // the message is appended but before the nextOffsetMetadata is updated. 
In that case the second fetch may
+      // cause OffsetOutOfRangeException. To solve that, we cap the reading up 
to exposed position instead of the log
+      // end of the active segment.
+      val maxPosition = {
+        if (entry == segments.lastEntry) {
+          val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
+          // Check the segment again in case a new segment has just rolled out.
+          if (entry != segments.lastEntry)
+            // New log segment has rolled out, we can read up to the file end.
+            entry.getValue.size
+          else
+            exposedPos
+        } else {
+          entry.getValue.size
+        }
+      }
+      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, 
maxPosition)
       if(fetchInfo == null) {
         entry = segments.higherEntry(entry.getKey)
       } else {
@@ -622,12 +641,14 @@ class Log(val dir: File,
       val prev = addSegment(segment)
       if(prev != null)
         throw new KafkaException("Trying to roll a new log segment for topic 
partition %s with start offset %d while it already exists.".format(name, 
newOffset))
-      
+      // We need to update the segment base offset and append position data of 
the metadata when log rolls.
+      // The next offset should not change.
+      updateLogEndOffset(nextOffsetMetadata.messageOffset)
       // schedule an asynchronous flush of the old segment
       scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
       
       info("Rolled new log segment for '" + name + "' in %.0f 
ms.".format((System.nanoTime - start) / (1000.0*1000.0)))
-      
+
       segment
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1110c3f/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 1377e8f..4de4c2b 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -112,12 +112,13 @@ class LogSegment(val log: FileMessageSet,
    * @param startOffset A lower bound on the first offset to include in the 
message set we read
    * @param maxSize The maximum number of bytes to include in the message set 
we read
    * @param maxOffset An optional maximum offset for the message set we read
+   * @param maxPosition An optional maximum position in the log segment that 
should be exposed for read.
    * 
    * @return The fetched data and the offset metadata of the first message 
whose offset is >= startOffset,
    *         or null if the startOffset is larger than the largest offset in 
this log
    */
   @threadsafe
-  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): 
FetchDataInfo = {
+  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, 
maxPosition: Long = size): FetchDataInfo = {
     if(maxSize < 0)
       throw new IllegalArgumentException("Invalid max size for log read 
(%d)".format(maxSize))
 
@@ -138,8 +139,8 @@ class LogSegment(val log: FileMessageSet,
     val length = 
       maxOffset match {
         case None =>
-          // no max offset, just use the max size they gave unmolested
-          maxSize
+          // no max offset, just read until the max position
+          min((maxPosition - startPosition.position).toInt, maxSize)
         case Some(offset) => {
           // there is a max offset, translate it to a file position and use 
that to calculate the max read size
           if(offset < startOffset)
@@ -150,7 +151,7 @@ class LogSegment(val log: FileMessageSet,
               logSize // the max offset is off the end of the log, use the end 
of the file
             else
               mapping.position
-          min(endPosition - startPosition.position, maxSize) 
+          min(min(maxPosition, endPosition) - startPosition.position, 
maxSize).toInt
         }
       }
     FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))

Reply via email to