KAFKA-670 Clean spurious .index files; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3244bcaf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3244bcaf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3244bcaf Branch: refs/heads/trunk Commit: 3244bcafe4233611fc0366f9df53ec98b44ca571 Parents: f9702c6 Author: Edward Jay Kreps <jkr...@apache.org> Authored: Thu Dec 13 10:15:32 2012 -0800 Committer: Neha Narkhede <neha.narkh...@gmail.com> Committed: Thu Dec 13 10:15:32 2012 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 31 +++++++++++------ core/src/test/scala/unit/kafka/log/LogTest.scala | 26 ++++++++++++++ 2 files changed, 46 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3244bcaf/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 96bf2ed..66c07af 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -142,18 +142,27 @@ private[kafka] class Log(val dir: File, val logSegments = new ArrayList[LogSegment] val ls = dir.listFiles() if(ls != null) { - for(file <- ls if file.isFile && file.toString.endsWith(LogFileSuffix)) { - if(!file.canRead) - throw new IOException("Could not read file " + file) + for(file <- ls if file.isFile) { val filename = file.getName() - val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong - // TODO: we should ideally rebuild any missing index files, instead of erroring out - if(!Log.indexFilename(dir, start).exists) - throw new IllegalStateException("Found log file with no corresponding index file.") - logSegments.add(new LogSegment(dir = dir, - startOffset = start, - indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize)) + if(!file.canRead) { + throw new IOException("Could not read file " + file) + } else if(filename.endsWith(IndexFileSuffix)) { + // ensure that we have a corresponding log file for this index file + val log = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix)) + if(!log.exists) { + warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) + file.delete() + } + } else if(filename.endsWith(LogFileSuffix)) { + val offset = filename.substring(0, filename.length - LogFileSuffix.length).toLong + // TODO: we should ideally rebuild any missing index files, instead of erroring out + if(!Log.indexFilename(dir, offset).exists) + throw new IllegalStateException("Found log file with no corresponding index file.") + logSegments.add(new LogSegment(dir = dir, + startOffset = offset, + indexIntervalBytes = indexIntervalBytes, + maxIndexSize = maxIndexSize)) + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3244bcaf/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index afaa284..900d0e2 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -391,6 +391,32 @@ class LogTest extends JUnitSuite { log.delete() } } + + /** + * When we open a log any index segments without an associated log segment should be deleted. + */ + @Test + def testBogusIndexSegmentsAreRemoved() { + val bogusIndex1 = Log.indexFilename(logDir, 0) + val bogusIndex2 = Log.indexFilename(logDir, 5) + + val set = TestUtils.singleMessageSet("test".getBytes()) + val log = new Log(logDir, + maxLogFileSize = set.sizeInBytes * 5, + maxMessageSize = config.maxMessageSize, + maxIndexSize = 1000, + indexIntervalBytes = 1, + needsRecovery = false) + + assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) + assertFalse("The second index file should have been deleted.", bogusIndex2.exists) + + // check that we can append to the log + for(i <- 0 until 10) + log.append(set) + + log.delete() + } @Test def testReopenThenTruncate() {