Repository: kafka Updated Branches: refs/heads/trunk cda1f73d0 -> 35f589bb4
KAFKA-1641; Reset first dirty offset for compaction to earliest offset if the checkpointed offset is invalid; reviewed by Joel Koshy Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/35f589bb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/35f589bb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/35f589bb Branch: refs/heads/trunk Commit: 35f589bb4654b49035c27780717f560e74400444 Parents: cda1f73 Author: Guozhang Wang <[email protected]> Authored: Thu Oct 23 15:42:10 2014 -0700 Committer: Joel Koshy <[email protected]> Committed: Thu Oct 23 15:42:48 2014 -0700 ---------------------------------------------------------------------- .../scala/kafka/log/LogCleanerManager.scala | 30 ++++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/35f589bb/core/src/main/scala/kafka/log/LogCleanerManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index e8ced6a..bcfef77 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -75,13 +75,31 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To def grabFilthiestLog(): Option[LogToClean] = { inLock(lock) { val lastClean = allCleanerCheckpoints() - val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe - .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress - .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each - lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) - .filter(l => l.totalBytes > 0) // skip any empty logs + val dirtyLogs = logs.filter { + case (topicAndPartition, log) => log.config.compact // skip any logs marked for delete rather than dedupe + }.filterNot { + case (topicAndPartition, log) => inProgress.contains(topicAndPartition) // skip any logs already in-progress + }.map { + case (topicAndPartition, log) => // create a LogToClean instance for each + // if the log segments are abnormally truncated and hence the checkpointed offset + // is no longer valid, reset to the log starting offset and log the error event + val logStartOffset = log.logSegments.head.baseOffset + val firstDirtyOffset = { + val offset = lastClean.getOrElse(topicAndPartition, logStartOffset) + if (offset < logStartOffset) { + error("Resetting first dirty offset to log start offset %d since the checkpointed offset %d is invalid." + .format(logStartOffset, offset)) + logStartOffset + } else { + offset + } + } + LogToClean(topicAndPartition, log, firstDirtyOffset) + }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs + this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0 - val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio + // and must meet the minimum threshold for dirty byte ratio + val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio) if(cleanableLogs.isEmpty) { None } else {
