Repository: kafka
Updated Branches:
  refs/heads/trunk ca758252c -> 16ecf9806


kafka-2012; Broker should automatically handle corrupt index files;  patched by 
Manikumar Reddy; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/16ecf980
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/16ecf980
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/16ecf980

Branch: refs/heads/trunk
Commit: 16ecf9806b286d9510103a4426bf0901d7dc8778
Parents: ca75825
Author: Manikumar Reddy <manikumar.re...@gmail.com>
Authored: Fri Jun 19 09:34:22 2015 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Fri Jun 19 09:34:22 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 18 ++++++++---
 .../src/test/scala/unit/kafka/log/LogTest.scala | 33 ++++++++++++++++++++
 2 files changed, 46 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/16ecf980/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 6b9274d..e5e8007 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -169,7 +169,7 @@ class Log(val dir: File,
       } else if(filename.endsWith(LogFileSuffix)) {
         // if its a log file, load the corresponding log segment
         val start = filename.substring(0, filename.length - 
LogFileSuffix.length).toLong
-        val hasIndex = Log.indexFilename(dir, start).exists
+        val indexFile = Log.indexFilename(dir, start)
         val segment = new LogSegment(dir = dir, 
                                      startOffset = start,
                                      indexIntervalBytes = 
config.indexInterval, 
@@ -177,7 +177,18 @@ class Log(val dir: File,
                                      rollJitterMs = config.randomSegmentJitter,
                                      time = time,
                                      fileAlreadyExists = true)
-        if(!hasIndex) {
+
+        if(indexFile.exists()) {
+          try {
+              segment.index.sanityCheck()
+          } catch {
+            case e: java.lang.IllegalArgumentException =>
+              warn("Found an corrupted index file, %s, deleting and rebuilding 
index...".format(indexFile.getAbsolutePath))
+              indexFile.delete()
+              segment.recover(config.maxMessageSize)
+          }
+        }
+        else {
           error("Could not find index file corresponding to log file %s, 
rebuilding index...".format(segment.log.file.getAbsolutePath))
           segment.recover(config.maxMessageSize)
         }
@@ -223,9 +234,6 @@ class Log(val dir: File,
       activeSegment.index.resize(config.maxIndexSize)
     }
 
-    // sanity check the index file of every segment to ensure we don't proceed 
with a corrupt segment
-    for (s <- logSegments)
-      s.index.sanityCheck()
   }
 
   private def updateLogEndOffset(messageOffset: Long) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/16ecf980/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index a8e57c2..9e26190 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -489,6 +489,39 @@ class LogTest extends JUnitSuite {
   }
 
   /**
+   * Test that if we have corrupted an index segment it is rebuilt when the 
log is re-opened
+   */
+  @Test
+  def testCorruptIndexRebuild() {
+    // publish the messages and close the log
+    val numMessages = 200
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 200: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+
+    val config = LogConfig(logProps)
+    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    for(i <- 0 until numMessages)
+      log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
+    val indexFiles = log.logSegments.map(_.index.file)
+    log.close()
+
+    // corrupt all the index files
+    for( file <- indexFiles) {
+      val bw = new BufferedWriter(new FileWriter(file))
+      bw.write("  ")
+      bw.close()
+    }
+
+    // reopen the log
+    log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
+    assertEquals("Should have %d messages when log is 
reopened".format(numMessages), numMessages, log.logEndOffset)
+    for(i <- 0 until numMessages)
+      assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
+    log.close()
+  }
+
+  /**
    * Test the Log truncate operations
    */
   @Test

Reply via email to