Repository: kafka
Updated Branches:
  refs/heads/0.9.0 99ed1e37c -> 728e14e25


KAFKA-3159; stale high watermark segment offset causes early fetch return

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com>

Closes #884 from hachikuji/K3159

(cherry picked from commit 6d0dca7345d9e3c0b8924496a4632954ca1268e5)
Signed-off-by: Jun Rao <jun...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/728e14e2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/728e14e2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/728e14e2

Branch: refs/heads/0.9.0
Commit: 728e14e25a414b2ffb525845bb382317d833dd8d
Parents: 99ed1e3
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Feb 9 16:57:53 2016 -0600
Committer: Jun Rao <jun...@gmail.com>
Committed: Tue Feb 9 16:58:04 2016 -0600

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  2 +-
 .../main/scala/kafka/server/DelayedFetch.scala  | 29 ++++++++++++--------
 .../scala/kafka/server/LogOffsetMetadata.scala  | 19 ++-----------
 3 files changed, 21 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/728e14e2/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 3805dcc..8698bff 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -353,7 +353,7 @@ class Partition(val topic: String,
     val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
     val newHighWatermark = allLogEndOffsets.min(new 
LogOffsetMetadata.OffsetOrdering)
     val oldHighWatermark = leaderReplica.highWatermark
-    if(oldHighWatermark.precedes(newHighWatermark)) {
+    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || 
oldHighWatermark.onOlderSegment(newHighWatermark)) {
       leaderReplica.highWatermark = newHighWatermark
       debug("High watermark for partition [%s,%d] updated to %s".format(topic, 
partitionId, newHighWatermark))
       true

http://git-wip-us.apache.org/repos/asf/kafka/blob/728e14e2/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index de6cf5b..83d0f5f 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -82,18 +82,23 @@ class DelayedFetch(delayMs: Long,
               else
                 replica.logEndOffset
 
-            if (endOffset.offsetOnOlderSegment(fetchOffset)) {
-              // Case C, this can happen when the new fetch operation is on a 
truncated leader
-              debug("Satisfying fetch %s since it is fetching later segments 
of partition %s.".format(fetchMetadata, topicAndPartition))
-              return forceComplete()
-            } else if (fetchOffset.offsetOnOlderSegment(endOffset)) {
-              // Case C, this can happen when the fetch operation is falling 
behind the current segment
-              // or the partition has just rolled a new segment
-              debug("Satisfying fetch %s immediately since it is fetching 
older segments.".format(fetchMetadata))
-              return forceComplete()
-            } else if (fetchOffset.precedes(endOffset)) {
-              // we need take the partition fetch size as upper bound when 
accumulating the bytes
-              accumulatedSize += math.min(endOffset.positionDiff(fetchOffset), 
fetchStatus.fetchInfo.fetchSize)
+            // Go directly to the check for Case D if the message offsets are 
the same. If the log segment
+            // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
+            // which would incorrectly be seen as an instance of Case C.
+            if (endOffset.messageOffset != fetchOffset.messageOffset) {
+              if (endOffset.onOlderSegment(fetchOffset)) {
+                // Case C, this can happen when the new fetch operation is on 
a truncated leader
+                debug("Satisfying fetch %s since it is fetching later segments 
of partition %s.".format(fetchMetadata, topicAndPartition))
+                return forceComplete()
+              } else if (fetchOffset.onOlderSegment(endOffset)) {
+                // Case C, this can happen when the fetch operation is falling 
behind the current segment
+                // or the partition has just rolled a new segment
+                debug("Satisfying fetch %s immediately since it is fetching 
older segments.".format(fetchMetadata))
+                return forceComplete()
+              } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+                // we need take the partition fetch size as upper bound when 
accumulating the bytes
+                accumulatedSize += 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize)
+              }
             }
           }
         } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/728e14e2/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala 
b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
index 6add702..7067b20 100644
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -43,7 +43,7 @@ case class LogOffsetMetadata(messageOffset: Long,
                              relativePositionInSegment: Int = 
LogOffsetMetadata.UnknownFilePosition) {
 
   // check if this offset is already on an older segment compared with the 
given offset
-  def offsetOnOlderSegment(that: LogOffsetMetadata): Boolean = {
+  def onOlderSegment(that: LogOffsetMetadata): Boolean = {
     if (messageOffsetOnly())
       throw new KafkaException(s"$this cannot compare its segment info with 
$that since it only has message offset info")
 
@@ -51,26 +51,13 @@ case class LogOffsetMetadata(messageOffset: Long,
   }
 
   // check if this offset is on the same segment with the given offset
-  def offsetOnSameSegment(that: LogOffsetMetadata): Boolean = {
+  def onSameSegment(that: LogOffsetMetadata): Boolean = {
     if (messageOffsetOnly())
       throw new KafkaException(s"$this cannot compare its segment info with 
$that since it only has message offset info")
 
     this.segmentBaseOffset == that.segmentBaseOffset
   }
 
-  // check if this offset is before the given offset. We need to compare both 
message offset and segment base offset.
-  def precedes(that: LogOffsetMetadata): Boolean = {
-    this.messageOffset < that.messageOffset || {
-      if (this.messageOffsetOnly() && that.messageOffsetOnly())
-        false
-      else if (!this.messageOffsetOnly() && !that.messageOffsetOnly())
-        this.segmentBaseOffset < that.segmentBaseOffset
-      else
-        throw new KafkaException(s"Cannot compare $this with $that as one has 
segment base offsets and the other does not.")
-    }
-  }
-
-
   // compute the number of messages between this offset to the given offset
   def offsetDiff(that: LogOffsetMetadata): Long = {
     this.messageOffset - that.messageOffset
@@ -79,7 +66,7 @@ case class LogOffsetMetadata(messageOffset: Long,
   // compute the number of bytes between this offset to the given offset
   // if they are on the same segment and this offset precedes the given offset
   def positionDiff(that: LogOffsetMetadata): Int = {
-    if(!offsetOnSameSegment(that))
+    if(!onSameSegment(that))
       throw new KafkaException(s"$this cannot compare its segment position 
with $that since they are not on the same segment")
     if(messageOffsetOnly())
       throw new KafkaException(s"$this cannot compare its segment position 
with $that since it only has message offset info")

Reply via email to