Repository: kafka
Updated Branches:
  refs/heads/trunk ba3e08958 -> dbfe8c0a7


kafka-2118; Cleaner cannot clean after shutdown during replaceSegments; patched 
by Rajini Sivaram; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dbfe8c0a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dbfe8c0a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dbfe8c0a

Branch: refs/heads/trunk
Commit: dbfe8c0a7dfea65e9f32e6157da1c9a3ce256171
Parents: ba3e089
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Sun Apr 26 19:17:15 2015 -0500
Committer: Jun Rao <jun...@gmail.com>
Committed: Sun Apr 26 19:17:15 2015 -0500

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 55 +++++++++--
 .../test/scala/unit/kafka/log/CleanerTest.scala | 99 ++++++++++++++++++++
 2 files changed, 144 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dbfe8c0a/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
old mode 100755
new mode 100644
index 5563f2d..84e7b8f
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -122,9 +122,10 @@ class Log(val dir: File,
   private def loadSegments() {
     // create the log directory if it doesn't exist
     dir.mkdirs()
+    var swapFiles = Set[File]()
     
     // first do a pass through the files in the log directory and remove any 
temporary files 
-    // and complete any interrupted swap operations
+    // and find any interrupted swap operations
     for(file <- dir.listFiles if file.isFile) {
       if(!file.canRead)
         throw new IOException("Could not read file " + file)
@@ -134,7 +135,7 @@ class Log(val dir: File,
         file.delete()
       } else if(filename.endsWith(SwapFileSuffix)) {
         // we crashed in the middle of a swap operation, to recover:
-        // if a log, swap it in and delete the .index file
+        // if a log, delete the .index file, complete the swap operation later
         // if an index just delete it, it will be rebuilt
         val baseName = new File(CoreUtils.replaceSuffix(file.getPath, 
SwapFileSuffix, ""))
         if(baseName.getPath.endsWith(IndexFileSuffix)) {
@@ -143,12 +144,7 @@ class Log(val dir: File,
           // delete the index
           val index = new File(CoreUtils.replaceSuffix(baseName.getPath, 
LogFileSuffix, IndexFileSuffix))
           index.delete()
-          // complete the swap operation
-          val renamed = file.renameTo(baseName)
-          if(renamed)
-            info("Found log file %s from interrupted swap operation, 
repairing.".format(file.getPath))
-          else
-            throw new KafkaException("Failed to rename file 
%s.".format(file.getPath))
+          swapFiles += file
         }
       }
     }
@@ -180,6 +176,27 @@ class Log(val dir: File,
         segments.put(start, segment)
       }
     }
+    
+    // Finally, complete any interrupted swap operations. To be crash-safe,
+    // log files that are replaced by the swap segment should be renamed to 
.deleted
+    // before the swap file is restored as the new segment file.
+    for (swapFile <- swapFiles) {
+      val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, 
SwapFileSuffix, ""))
+      val fileName = logFile.getName
+      val startOffset = fileName.substring(0, fileName.length - 
LogFileSuffix.length).toLong
+      val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, 
LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
+      val index =  new OffsetIndex(file = indexFile, baseOffset = startOffset, 
maxIndexSize = config.maxIndexSize)
+      val swapSegment = new LogSegment(new FileMessageSet(file = swapFile),
+                                       index = index,
+                                       baseOffset = startOffset,
+                                       indexIntervalBytes = 
config.indexInterval,
+                                       rollJitterMs = 
config.randomSegmentJitter,
+                                       time = time)
+      info("Found log file %s from interrupted swap operation, 
repairing.".format(swapFile.getPath))
+      swapSegment.recover(config.maxMessageSize)
+      val oldSegments = logSegments(swapSegment.baseOffset, 
swapSegment.nextOffset)
+      replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = 
true)
+    }
 
     if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment beginning at 
offset 0
@@ -748,14 +765,32 @@ class Log(val dir: File,
    * Swap a new segment in place and delete one or more existing segments in a 
crash-safe manner. The old segments will
    * be asynchronously deleted.
    * 
+   * The sequence of operations is:
+   * <ol>
+   *   <li> Cleaner creates new segment with suffix .cleaned and invokes 
replaceSegments().
+   *        If broker crashes at this point, the clean-and-swap operation is 
aborted and
+   *        the .cleaned file is deleted on recovery in loadSegments().
+   *   <li> New segment is renamed .swap. If the broker crashes after this 
point before the whole
+   *        operation is completed, the swap operation is resumed on recovery 
as described in the next step.
+   *   <li> Old segment files are renamed to .deleted and asynchronous delete 
is scheduled.
+   *        If the broker crashes, any .deleted files left behind are deleted 
on recovery in loadSegments().
+   *        replaceSegments() is then invoked to complete the swap with 
newSegment recreated from
+   *        the .swap file and oldSegments containing segments which were not 
renamed before the crash.
+   *   <li> Swap segment is renamed to replace the existing segment, 
completing this operation.
+   *        If the broker crashes, any .deleted files which may be left behind 
are deleted
+   *        on recovery in loadSegments().
+   * </ol>
+   * 
    * @param newSegment The new log segment to add to the log
    * @param oldSegments The old log segments to delete from the log
+   * @param isRecoveredSwapFile true if the new segment was created from a 
swap file during recovery after a crash
    */
-  private[log] def replaceSegments(newSegment: LogSegment, oldSegments: 
Seq[LogSegment]) {
+  private[log] def replaceSegments(newSegment: LogSegment, oldSegments: 
Seq[LogSegment], isRecoveredSwapFile : Boolean = false) {
     lock synchronized {
       // need to do this in two phases to be crash safe AND do the delete 
asynchronously
       // if we crash in the middle of this we complete the swap in 
loadSegments()
-      newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)
+      if (!isRecoveredSwapFile)
+        newSegment.changeFileSuffixes(Log.CleanedFileSuffix, 
Log.SwapFileSuffix)
       addSegment(newSegment)
         
       // delete the old files

http://git-wip-us.apache.org/repos/asf/kafka/blob/dbfe8c0a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 9792ed6..8b8249a 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -275,6 +275,105 @@ class CleanerTest extends JUnitSuite {
     checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt)
   }
   
+  
+  /**
+   * Tests recovery if broker crashes at the following stages during the 
cleaning sequence
+   * <ol>
+   *   <li> Cleaner has created .cleaned log containing multiple segments, 
swap sequence not yet started
+   *   <li> .cleaned log renamed to .swap, old segment files not yet renamed 
to .deleted
+   *   <li> .cleaned log renamed to .swap, old segment files renamed to 
.deleted, but not yet deleted
+   *   <li> .swap suffix removed, completing the swap, but async delete of 
.deleted files not yet complete
+   * </ol>
+   */
+  @Test
+  def testRecoveryAfterCrash() {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val config = logConfig.copy(segmentSize = 300, indexInterval = 1, 
fileDeleteDelayMs = 10)
+      
+    def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log 
= {   
+      // Recover log file and check that after recovery, keys are as expected
+      // and all temporary files have been deleted
+      val recoveredLog = makeLog(config = config)
+      time.sleep(config.fileDeleteDelayMs + 1)
+      for (file <- dir.listFiles) {
+        assertFalse("Unexpected .deleted file after recovery", 
file.getName.endsWith(Log.DeletedFileSuffix))
+        assertFalse("Unexpected .cleaned file after recovery", 
file.getName.endsWith(Log.CleanedFileSuffix))
+        assertFalse("Unexpected .swap file after recovery", 
file.getName.endsWith(Log.SwapFileSuffix))
+      }
+      assertEquals(expectedKeys, keysInLog(recoveredLog))
+      recoveredLog
+    }
+    
+    // create a log and append some messages
+    var log = makeLog(config = config)
+    var messageCount = 0
+    while(log.numberOfSegments < 10) {
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      messageCount += 1
+    }
+    val allKeys = keysInLog(log)
+    
+    // pretend we have odd-numbered keys
+    val offsetMap = new FakeOffsetMap(Int.MaxValue)
+    for (k <- 1 until messageCount by 2)
+      offsetMap.put(key(k), Long.MaxValue)
+     
+    // clean the log
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L)
+    var cleanedKeys = keysInLog(log)
+    
+    // 1) Simulate recovery just after .cleaned file is created, before rename 
to .swap
+    //    On recovery, clean operation is aborted. All messages should be 
present in the log
+    log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix)
+    for (file <- dir.listFiles if 
file.getName.endsWith(Log.DeletedFileSuffix)) {
+      file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, 
Log.DeletedFileSuffix, "")))
+    }
+    log = recoverAndCheck(config, allKeys)
+    
+    // clean again
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L)
+    cleanedKeys = keysInLog(log)
+    
+    // 2) Simulate recovery just after swap file is created, before old 
segment files are
+    //    renamed to .deleted. Clean operation is resumed during recovery. 
+    log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
+    for (file <- dir.listFiles if 
file.getName.endsWith(Log.DeletedFileSuffix)) {
+      file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, 
Log.DeletedFileSuffix, "")))
+    }   
+    log = recoverAndCheck(config, cleanedKeys)
+    
+    // add some more messages and clean the log again
+    while(log.numberOfSegments < 10) {
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      messageCount += 1
+    }
+    for (k <- 1 until messageCount by 2)
+      offsetMap.put(key(k), Long.MaxValue)    
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L)
+    cleanedKeys = keysInLog(log)
+    
+    // 3) Simulate recovery after swap file is created and old segments files 
are renamed
+    //    to .deleted. Clean operation is resumed during recovery.
+    log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
+    log = recoverAndCheck(config, cleanedKeys)
+    
+    // add some more messages and clean the log again
+    while(log.numberOfSegments < 10) {
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      messageCount += 1
+    }
+    for (k <- 1 until messageCount by 2)
+      offsetMap.put(key(k), Long.MaxValue)    
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L)
+    cleanedKeys = keysInLog(log)
+    
+    // 4) Simulate recovery after swap is complete, but async deletion
+    //    is not yet complete. Clean operation is resumed during recovery.
+    recoverAndCheck(config, cleanedKeys)
+    
+  }
+  
+  
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =
     new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = 
time.scheduler, time = time)
 

Reply via email to