Repository: kafka Updated Branches: refs/heads/0.9.0 99ed1e37c -> 728e14e25
KAFKA-3159; stale high watermark segment offset causes early fetch return Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com> Closes #884 from hachikuji/K3159 (cherry picked from commit 6d0dca7345d9e3c0b8924496a4632954ca1268e5) Signed-off-by: Jun Rao <jun...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/728e14e2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/728e14e2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/728e14e2 Branch: refs/heads/0.9.0 Commit: 728e14e25a414b2ffb525845bb382317d833dd8d Parents: 99ed1e3 Author: Jason Gustafson <ja...@confluent.io> Authored: Tue Feb 9 16:57:53 2016 -0600 Committer: Jun Rao <jun...@gmail.com> Committed: Tue Feb 9 16:58:04 2016 -0600 ---------------------------------------------------------------------- .../main/scala/kafka/cluster/Partition.scala | 2 +- .../main/scala/kafka/server/DelayedFetch.scala | 29 ++++++++++++-------- .../scala/kafka/server/LogOffsetMetadata.scala | 19 ++----------- 3 files changed, 21 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/728e14e2/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 3805dcc..8698bff 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -353,7 +353,7 @@ class Partition(val topic: String, val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) val oldHighWatermark = leaderReplica.highWatermark - if(oldHighWatermark.precedes(newHighWatermark)) { + if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) { leaderReplica.highWatermark = newHighWatermark debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) true http://git-wip-us.apache.org/repos/asf/kafka/blob/728e14e2/core/src/main/scala/kafka/server/DelayedFetch.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index de6cf5b..83d0f5f 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -82,18 +82,23 @@ class DelayedFetch(delayMs: Long, else replica.logEndOffset - if (endOffset.offsetOnOlderSegment(fetchOffset)) { - // Case C, this can happen when the new fetch operation is on a truncated leader - debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicAndPartition)) - return forceComplete() - } else if (fetchOffset.offsetOnOlderSegment(endOffset)) { - // Case C, this can happen when the fetch operation is falling behind the current segment - // or the partition has just rolled a new segment - debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata)) - return forceComplete() - } else if (fetchOffset.precedes(endOffset)) { - // we need take the partition fetch size as upper bound when accumulating the bytes - accumulatedSize += math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize) + // Go directly to the check for Case D if the message offsets are the same. If the log segment + // has just rolled, then the high watermark offset will remain the same but be on the old segment, + // which would incorrectly be seen as an instance of Case C. + if (endOffset.messageOffset != fetchOffset.messageOffset) { + if (endOffset.onOlderSegment(fetchOffset)) { + // Case C, this can happen when the new fetch operation is on a truncated leader + debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicAndPartition)) + return forceComplete() + } else if (fetchOffset.onOlderSegment(endOffset)) { + // Case C, this can happen when the fetch operation is falling behind the current segment + // or the partition has just rolled a new segment + debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata)) + return forceComplete() + } else if (fetchOffset.messageOffset < endOffset.messageOffset) { + // we need take the partition fetch size as upper bound when accumulating the bytes + accumulatedSize += math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize) + } } } } catch { http://git-wip-us.apache.org/repos/asf/kafka/blob/728e14e2/core/src/main/scala/kafka/server/LogOffsetMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala index 6add702..7067b20 100644 --- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala +++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala @@ -43,7 +43,7 @@ case class LogOffsetMetadata(messageOffset: Long, relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) { // check if this offset is already on an older segment compared with the given offset - def offsetOnOlderSegment(that: LogOffsetMetadata): Boolean = { + def onOlderSegment(that: LogOffsetMetadata): Boolean = { if (messageOffsetOnly()) throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") @@ -51,26 +51,13 @@ case class LogOffsetMetadata(messageOffset: Long, } // check if this offset is on the same segment with the given offset - def offsetOnSameSegment(that: LogOffsetMetadata): Boolean = { + def onSameSegment(that: LogOffsetMetadata): Boolean = { if (messageOffsetOnly()) throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") this.segmentBaseOffset == that.segmentBaseOffset } - // check if this offset is before the given offset. We need to compare both message offset and segment base offset. - def precedes(that: LogOffsetMetadata): Boolean = { - this.messageOffset < that.messageOffset || { - if (this.messageOffsetOnly() && that.messageOffsetOnly()) - false - else if (!this.messageOffsetOnly() && !that.messageOffsetOnly()) - this.segmentBaseOffset < that.segmentBaseOffset - else - throw new KafkaException(s"Cannot compare $this with $that as one has segment base offsets and the other does not.") - } - } - - // compute the number of messages between this offset to the given offset def offsetDiff(that: LogOffsetMetadata): Long = { this.messageOffset - that.messageOffset @@ -79,7 +66,7 @@ case class LogOffsetMetadata(messageOffset: Long, // compute the number of bytes between this offset to the given offset // if they are on the same segment and this offset precedes the given offset def positionDiff(that: LogOffsetMetadata): Int = { - if(!offsetOnSameSegment(that)) + if(!onSameSegment(that)) throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment") if(messageOffsetOnly()) throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info")