Fatal error during KafkaServerStable startup when hard-failed broker is re-started; patched by Swapnil Ghike; reviewed by Jun Rao and Jay Kreps; kafka-757
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e81b0a3d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e81b0a3d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e81b0a3d Branch: refs/heads/trunk Commit: e81b0a3ded9ff31f5089f2a3f294fbe0aef9614b Parents: 48745f0 Author: Jun Rao <[email protected]> Authored: Wed Feb 13 14:01:57 2013 -0800 Committer: Jun Rao <[email protected]> Committed: Wed Feb 13 14:01:57 2013 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 11 ++++++++++- core/src/main/scala/kafka/log/OffsetIndex.scala | 18 ++++++++++++------ 2 files changed, 22 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e81b0a3d/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index eee0ed3..d0b26ab 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -192,7 +192,16 @@ private[kafka] class Log(val dir: File, if(needsRecovery) recoverSegment(logSegments.get(logSegments.size - 1)) } - new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size))) + + val segmentList = logSegments.toArray(new Array[LogSegment](logSegments.size)) + // Check for the index file of every segment, if it's empty or its last offset is greater than its base offset. + for (s <- segmentList) { + require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset, + "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" + .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset)) + } + + new SegmentList(segmentList) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/e81b0a3d/core/src/main/scala/kafka/log/OffsetIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 0d67242..e806da9 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -92,7 +92,6 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) - require(entries == 0 || lastOffset > this.baseOffset, "Corrupt index found, index file (%s) has non-zero size but last offset is %d.".format(file.getAbsolutePath, lastOffset)) /* the maximum number of entries this index can hold */ def maxEntries = mmap.limit / 8 @@ -130,7 +129,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = * Return -1 if the least entry in the index is larger than the target offset or the index is empty */ private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = { - // we only store the difference from the baseoffset so calculate that + // we only store the difference from the base offset so calculate that val relOffset = targetOffset - baseOffset // check if the index is empty @@ -197,7 +196,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = /** * Truncate the entire index */ - def truncate() = truncateTo(this.baseOffset) + def truncate() = truncateToEntries(0) /** * Remove all entries from the index which have an offset greater than or equal to the given offset. @@ -220,11 +219,18 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = slot else slot + 1 - this.size.set(newEntries) - mmap.position(this.size.get * 8) - this.lastOffset = readLastOffset + truncateToEntries(newEntries) } } + + /** + * Truncates index to a known number of entries. + */ + private def truncateToEntries(entries: Int) { + this.size.set(entries) + mmap.position(this.size.get * 8) + this.lastOffset = readLastOffset + } /** * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
