kafka-866; Recover segment does shallow iteration to fix index causing inconsistencies; patched by Sriram Subramanian; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f1d2141c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f1d2141c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f1d2141c Branch: refs/heads/trunk Commit: f1d2141ca065d51b41aeac8c7e2124672298d5f0 Parents: c5bb1d4 Author: Sriram Subramanian <[email protected]> Authored: Mon Apr 22 16:50:29 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Mon Apr 22 16:50:29 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f1d2141c/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index e38b95c..ef708e2 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -218,7 +218,15 @@ private[kafka] class Log(val dir: File, val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry > indexIntervalBytes) { - segment.index.append(entry.offset, validBytes) + // we need to decompress the message, if required, to get the offset of the first uncompressed message + val startOffset = + entry.message.compressionCodec match { + case NoCompressionCodec => + entry.offset + case _ => + ByteBufferMessageSet.decompress(entry.message).head.offset + } + segment.index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message)
