Repository: kafka Updated Branches: refs/heads/trunk ba3e08958 -> dbfe8c0a7
kafka-2118; Cleaner cannot clean after shutdown during replaceSegments; patched by Rajini Sivaram; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dbfe8c0a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dbfe8c0a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dbfe8c0a Branch: refs/heads/trunk Commit: dbfe8c0a7dfea65e9f32e6157da1c9a3ce256171 Parents: ba3e089 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Sun Apr 26 19:17:15 2015 -0500 Committer: Jun Rao <jun...@gmail.com> Committed: Sun Apr 26 19:17:15 2015 -0500 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 55 +++++++++-- .../test/scala/unit/kafka/log/CleanerTest.scala | 99 ++++++++++++++++++++ 2 files changed, 144 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dbfe8c0a/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala old mode 100755 new mode 100644 index 5563f2d..84e7b8f --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -122,9 +122,10 @@ class Log(val dir: File, private def loadSegments() { // create the log directory if it doesn't exist dir.mkdirs() + var swapFiles = Set[File]() // first do a pass through the files in the log directory and remove any temporary files - // and complete any interrupted swap operations + // and find any interrupted swap operations for(file <- dir.listFiles if file.isFile) { if(!file.canRead) throw new IOException("Could not read file " + file) @@ -134,7 +135,7 @@ class Log(val dir: File, file.delete() } else if(filename.endsWith(SwapFileSuffix)) { // we crashed in the middle of a swap operation, to recover: - // if a log, swap it in and delete the .index file + // if a log, delete the .index file, complete the swap operation later // if an index just delete it, it will be rebuilt val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) if(baseName.getPath.endsWith(IndexFileSuffix)) { @@ -143,12 +144,7 @@ class Log(val dir: File, // delete the index val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix)) index.delete() - // complete the swap operation - val renamed = file.renameTo(baseName) - if(renamed) - info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath)) - else - throw new KafkaException("Failed to rename file %s.".format(file.getPath)) + swapFiles += file } } } @@ -180,6 +176,27 @@ class Log(val dir: File, segments.put(start, segment) } } + + // Finally, complete any interrupted swap operations. To be crash-safe, + // log files that are replaced by the swap segment should be renamed to .deleted + // before the swap file is restored as the new segment file. + for (swapFile <- swapFiles) { + val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) + val fileName = logFile.getName + val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong + val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix) + val index = new OffsetIndex(file = indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) + val swapSegment = new LogSegment(new FileMessageSet(file = swapFile), + index = index, + baseOffset = startOffset, + indexIntervalBytes = config.indexInterval, + rollJitterMs = config.randomSegmentJitter, + time = time) + info("Found log file %s from interrupted swap operation, repairing.".format(swapFile.getPath)) + swapSegment.recover(config.maxMessageSize) + val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset) + replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true) + } if(logSegments.size == 0) { // no existing segments, create a new mutable segment beginning at offset 0 @@ -748,14 +765,32 @@ class Log(val dir: File, * Swap a new segment in place and delete one or more existing segments in a crash-safe manner. The old segments will * be asynchronously deleted. * + * The sequence of operations is: + * <ol> + * <li> Cleaner creates new segment with suffix .cleaned and invokes replaceSegments(). + * If broker crashes at this point, the clean-and-swap operation is aborted and + * the .cleaned file is deleted on recovery in loadSegments(). + * <li> New segment is renamed .swap. If the broker crashes after this point before the whole + * operation is completed, the swap operation is resumed on recovery as described in the next step. + * <li> Old segment files are renamed to .deleted and asynchronous delete is scheduled. + * If the broker crashes, any .deleted files left behind are deleted on recovery in loadSegments(). + * replaceSegments() is then invoked to complete the swap with newSegment recreated from + * the .swap file and oldSegments containing segments which were not renamed before the crash. + * <li> Swap segment is renamed to replace the existing segment, completing this operation. + * If the broker crashes, any .deleted files which may be left behind are deleted + * on recovery in loadSegments(). + * </ol> + * * @param newSegment The new log segment to add to the log * @param oldSegments The old log segments to delete from the log + * @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash */ - private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment]) { + private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], isRecoveredSwapFile : Boolean = false) { lock synchronized { // need to do this in two phases to be crash safe AND do the delete asynchronously // if we crash in the middle of this we complete the swap in loadSegments() - newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix) + if (!isRecoveredSwapFile) + newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix) addSegment(newSegment) // delete the old files http://git-wip-us.apache.org/repos/asf/kafka/blob/dbfe8c0a/core/src/test/scala/unit/kafka/log/CleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 9792ed6..8b8249a 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -275,6 +275,105 @@ class CleanerTest extends JUnitSuite { checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt) } + + /** + * Tests recovery if broker crashes at the following stages during the cleaning sequence + * <ol> + * <li> Cleaner has created .cleaned log containing multiple segments, swap sequence not yet started + * <li> .cleaned log renamed to .swap, old segment files not yet renamed to .deleted + * <li> .cleaned log renamed to .swap, old segment files renamed to .deleted, but not yet deleted + * <li> .swap suffix removed, completing the swap, but async delete of .deleted files not yet complete + * </ol> + */ + @Test + def testRecoveryAfterCrash() { + val cleaner = makeCleaner(Int.MaxValue) + val config = logConfig.copy(segmentSize = 300, indexInterval = 1, fileDeleteDelayMs = 10) + + def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = { + // Recover log file and check that after recovery, keys are as expected + // and all temporary files have been deleted + val recoveredLog = makeLog(config = config) + time.sleep(config.fileDeleteDelayMs + 1) + for (file <- dir.listFiles) { + assertFalse("Unexpected .deleted file after recovery", file.getName.endsWith(Log.DeletedFileSuffix)) + assertFalse("Unexpected .cleaned file after recovery", file.getName.endsWith(Log.CleanedFileSuffix)) + assertFalse("Unexpected .swap file after recovery", file.getName.endsWith(Log.SwapFileSuffix)) + } + assertEquals(expectedKeys, keysInLog(recoveredLog)) + recoveredLog + } + + // create a log and append some messages + var log = makeLog(config = config) + var messageCount = 0 + while(log.numberOfSegments < 10) { + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + messageCount += 1 + } + val allKeys = keysInLog(log) + + // pretend we have odd-numbered keys + val offsetMap = new FakeOffsetMap(Int.MaxValue) + for (k <- 1 until messageCount by 2) + offsetMap.put(key(k), Long.MaxValue) + + // clean the log + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L) + var cleanedKeys = keysInLog(log) + + // 1) Simulate recovery just after .cleaned file is created, before rename to .swap + // On recovery, clean operation is aborted. All messages should be present in the log + log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix) + for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { + file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + } + log = recoverAndCheck(config, allKeys) + + // clean again + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L) + cleanedKeys = keysInLog(log) + + // 2) Simulate recovery just after swap file is created, before old segment files are + // renamed to .deleted. Clean operation is resumed during recovery. + log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) + for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { + file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + } + log = recoverAndCheck(config, cleanedKeys) + + // add some more messages and clean the log again + while(log.numberOfSegments < 10) { + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + messageCount += 1 + } + for (k <- 1 until messageCount by 2) + offsetMap.put(key(k), Long.MaxValue) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L) + cleanedKeys = keysInLog(log) + + // 3) Simulate recovery after swap file is created and old segments files are renamed + // to .deleted. Clean operation is resumed during recovery. + log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) + log = recoverAndCheck(config, cleanedKeys) + + // add some more messages and clean the log again + while(log.numberOfSegments < 10) { + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + messageCount += 1 + } + for (k <- 1 until messageCount by 2) + offsetMap.put(key(k), Long.MaxValue) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L) + cleanedKeys = keysInLog(log) + + // 4) Simulate recovery after swap is complete, but async deletion + // is not yet complete. Clean operation is resumed during recovery. + recoverAndCheck(config, cleanedKeys) + + } + + def makeLog(dir: File = dir, config: LogConfig = logConfig) = new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)