Repository: kafka Updated Branches: refs/heads/trunk ca758252c -> 16ecf9806
kafka-2012; Broker should automatically handle corrupt index files; patched by Manikumar Reddy; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/16ecf980 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/16ecf980 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/16ecf980 Branch: refs/heads/trunk Commit: 16ecf9806b286d9510103a4426bf0901d7dc8778 Parents: ca75825 Author: Manikumar Reddy <manikumar.re...@gmail.com> Authored: Fri Jun 19 09:34:22 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Fri Jun 19 09:34:22 2015 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 18 ++++++++--- .../src/test/scala/unit/kafka/log/LogTest.scala | 33 ++++++++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/16ecf980/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 6b9274d..e5e8007 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -169,7 +169,7 @@ class Log(val dir: File, } else if(filename.endsWith(LogFileSuffix)) { // if its a log file, load the corresponding log segment val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong - val hasIndex = Log.indexFilename(dir, start).exists + val indexFile = Log.indexFilename(dir, start) val segment = new LogSegment(dir = dir, startOffset = start, indexIntervalBytes = config.indexInterval, @@ -177,7 +177,18 @@ class Log(val dir: File, rollJitterMs = config.randomSegmentJitter, time = time, fileAlreadyExists = true) - if(!hasIndex) { + + if(indexFile.exists()) { + try { + segment.index.sanityCheck() + } catch { + case e: java.lang.IllegalArgumentException => + warn("Found an corrupted index file, %s, deleting and rebuilding index...".format(indexFile.getAbsolutePath)) + indexFile.delete() + segment.recover(config.maxMessageSize) + } + } + else { error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) segment.recover(config.maxMessageSize) } @@ -223,9 +234,6 @@ class Log(val dir: File, activeSegment.index.resize(config.maxIndexSize) } - // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment - for (s <- logSegments) - s.index.sanityCheck() } private def updateLogEndOffset(messageOffset: Long) { http://git-wip-us.apache.org/repos/asf/kafka/blob/16ecf980/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index a8e57c2..9e26190 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -489,6 +489,39 @@ class LogTest extends JUnitSuite { } /** + * Test that if we have corrupted an index segment it is rebuilt when the log is re-opened + */ + @Test + def testCorruptIndexRebuild() { + // publish the messages and close the log + val numMessages = 200 + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 200: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) + + val config = LogConfig(logProps) + var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + for(i <- 0 until numMessages) + log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10))) + val indexFiles = log.logSegments.map(_.index.file) + log.close() + + // corrupt all the index files + for( file <- indexFiles) { + val bw = new BufferedWriter(new FileWriter(file)) + bw.write(" ") + bw.close() + } + + // reopen the log + log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time) + assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) + for(i <- 0 until numMessages) + assertEquals(i, log.read(i, 100, None).messageSet.head.offset) + log.close() + } + + /** * Test the Log truncate operations */ @Test