Repository: kafka Updated Branches: refs/heads/0.10.1 dce2e3f59 -> 5b869e7ee
KAFKA-4497: LogCleaner appended the wrong offset to time index. Backport the fix to 0.10.1. Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b869e7e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b869e7e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b869e7e Branch: refs/heads/0.10.1 Commit: 5b869e7ee4c6b554787330c386988b5447b127ca Parents: dce2e3f Author: Jiangjie Qin <[email protected]> Authored: Sun Dec 11 17:32:05 2016 -0800 Committer: Jiangjie Qin <[email protected]> Committed: Sun Dec 11 17:32:05 2016 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/LogCleaner.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5b869e7e/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 219957f..e0b0bb8 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -494,14 +494,15 @@ private[log] class Cleaner(val id: Int, writeOriginalMessageSet = false retainedMessages += deepMessageAndOffset - // We need the max timestamp and last offset for time index - if (deepMessageAndOffset.message.timestamp > maxTimestamp) + // We need the max timestamp and message offset for time index + if (deepMessageAndOffset.message.timestamp > maxTimestamp) { maxTimestamp = deepMessageAndOffset.message.timestamp + offsetOfMaxTimestamp = deepMessageAndOffset.offset + } } else { writeOriginalMessageSet = false } } - offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L // There are no messages compacted out and no message format conversion, write the original message set back if (writeOriginalMessageSet) ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, shallowOffset) @@ -758,7 +759,7 @@ private case class CleanerStats(time: Time = SystemTime) { def elapsedSecs = (endTime - startTime)/1000.0 def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0 - + def clear() { startTime = time.milliseconds mapCompleteTime = -1L
