Repository: kafka Updated Branches: refs/heads/trunk dc662776c -> 216c75bbc
KAFKA-3003: Update the replica.highWatermark correctly Author: Jiangjie Qin <[email protected]> Reviewers: Ismael Juma, Guozhang Wang Closes #688 from becketqin/KAFKA-3003 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/216c75bb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/216c75bb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/216c75bb Branch: refs/heads/trunk Commit: 216c75bbc567a633447b1cbf2bb8c6ba34e2464f Parents: dc66277 Author: Jiangjie Qin <[email protected]> Authored: Fri Feb 5 09:06:47 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Fri Feb 5 09:06:47 2016 -0800 ---------------------------------------------------------------------- .../scala/kafka/server/LogOffsetMetadata.scala | 22 ++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/216c75bb/core/src/main/scala/kafka/server/LogOffsetMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala index 00b60fe..6add702 100644 --- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala +++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala @@ -45,7 +45,7 @@ case class LogOffsetMetadata(messageOffset: Long, // check if this offset is already on an older segment compared with the given offset def offsetOnOlderSegment(that: LogOffsetMetadata): Boolean = { if (messageOffsetOnly()) - throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that)) + throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") this.segmentBaseOffset < that.segmentBaseOffset } @@ -53,13 +53,23 @@ case class LogOffsetMetadata(messageOffset: Long, // check if this offset is on the same segment with the given offset def offsetOnSameSegment(that: LogOffsetMetadata): Boolean = { if (messageOffsetOnly()) - throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that)) + throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") this.segmentBaseOffset == that.segmentBaseOffset } - // check if this offset is before the given offset - def precedes(that: LogOffsetMetadata): Boolean = this.messageOffset < that.messageOffset + // check if this offset is before the given offset. We need to compare both message offset and segment base offset. + def precedes(that: LogOffsetMetadata): Boolean = { + this.messageOffset < that.messageOffset || { + if (this.messageOffsetOnly() && that.messageOffsetOnly()) + false + else if (!this.messageOffsetOnly() && !that.messageOffsetOnly()) + this.segmentBaseOffset < that.segmentBaseOffset + else + throw new KafkaException(s"Cannot compare $this with $that as one has segment base offsets and the other does not.") + } + } + // compute the number of messages between this offset to the given offset def offsetDiff(that: LogOffsetMetadata): Long = { @@ -70,9 +80,9 @@ case class LogOffsetMetadata(messageOffset: Long, // if they are on the same segment and this offset precedes the given offset def positionDiff(that: LogOffsetMetadata): Int = { if(!offsetOnSameSegment(that)) - throw new KafkaException("%s cannot compare its segment position with %s since they are not on the same segment".format(this, that)) + throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment") if(messageOffsetOnly()) - throw new KafkaException("%s cannot compare its segment position with %s since it only has message offset info".format(this, that)) + throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info") this.relativePositionInSegment - that.relativePositionInSegment }
