Repository: kafka Updated Branches: refs/heads/0.9.0 c3f575d5f -> 64d512959
KAFKA-3003: Update the replica.highWatermark correctly Author: Jiangjie Qin <[email protected]> Reviewers: Ismael Juma, Guozhang Wang Closes #688 from becketqin/KAFKA-3003 (cherry picked from commit 216c75bbc567a633447b1cbf2bb8c6ba34e2464f) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64d51295 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64d51295 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64d51295 Branch: refs/heads/0.9.0 Commit: 64d51295929fb894e5d981c3086e646646fb97c9 Parents: c3f575d Author: Jiangjie Qin <[email protected]> Authored: Fri Feb 5 09:06:47 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Fri Feb 5 09:06:59 2016 -0800 ---------------------------------------------------------------------- .../scala/kafka/server/LogOffsetMetadata.scala | 22 ++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/64d51295/core/src/main/scala/kafka/server/LogOffsetMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala index 00b60fe..6add702 100644 --- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala +++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala @@ -45,7 +45,7 @@ case class LogOffsetMetadata(messageOffset: Long, // check if this offset is already on an older segment compared with the given offset def offsetOnOlderSegment(that: LogOffsetMetadata): Boolean = { if (messageOffsetOnly()) - throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that)) + throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") this.segmentBaseOffset < that.segmentBaseOffset } @@ -53,13 +53,23 @@ case class LogOffsetMetadata(messageOffset: Long, // check if this offset is on the same segment with the given offset def offsetOnSameSegment(that: LogOffsetMetadata): Boolean = { if (messageOffsetOnly()) - throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that)) + throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") this.segmentBaseOffset == that.segmentBaseOffset } - // check if this offset is before the given offset - def precedes(that: LogOffsetMetadata): Boolean = this.messageOffset < that.messageOffset + // check if this offset is before the given offset. We need to compare both message offset and segment base offset. + def precedes(that: LogOffsetMetadata): Boolean = { + this.messageOffset < that.messageOffset || { + if (this.messageOffsetOnly() && that.messageOffsetOnly()) + false + else if (!this.messageOffsetOnly() && !that.messageOffsetOnly()) + this.segmentBaseOffset < that.segmentBaseOffset + else + throw new KafkaException(s"Cannot compare $this with $that as one has segment base offsets and the other does not.") + } + } + // compute the number of messages between this offset to the given offset def offsetDiff(that: LogOffsetMetadata): Long = { @@ -70,9 +80,9 @@ case class LogOffsetMetadata(messageOffset: Long, // if they are on the same segment and this offset precedes the given offset def positionDiff(that: LogOffsetMetadata): Int = { if(!offsetOnSameSegment(that)) - throw new KafkaException("%s cannot compare its segment position with %s since they are not on the same segment".format(this, that)) + throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment") if(messageOffsetOnly()) - throw new KafkaException("%s cannot compare its segment position with %s since it only has message offset info".format(this, that)) + throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info") this.relativePositionInSegment - that.relativePositionInSegment }
