Repository: kafka Updated Branches: refs/heads/1.0 963733cd0 -> d3a603cb9
KAFKA-5829; Only delete producer snapshots before the recovery point Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #4023 from ijuma/kafka-5829-avoid-reading-older-segments-on-hard-shutdown (cherry picked from commit 91517e8fbd7767ba6d7f43b517f5a26b6f870585) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d3a603cb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d3a603cb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d3a603cb Branch: refs/heads/1.0 Commit: d3a603cb90fc4a0ac248569694f1f15c6e0dfbf1 Parents: 963733c Author: Ismael Juma <[email protected]> Authored: Fri Oct 6 16:45:45 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Fri Oct 6 16:55:20 2017 -0700 ---------------------------------------------------------------------- .../kafka/consumer/ConsumerFetcherThread.scala | 1 + core/src/main/scala/kafka/log/Log.scala | 202 +++++++++++-------- core/src/main/scala/kafka/log/LogCleaner.scala | 2 +- .../scala/kafka/log/LogCleanerManager.scala | 9 +- core/src/main/scala/kafka/log/LogManager.scala | 93 +++++---- .../scala/kafka/log/ProducerStateManager.scala | 59 +++--- .../kafka/server/AbstractFetcherThread.scala | 4 +- .../src/main/scala/kafka/server/KafkaApis.scala | 2 +- .../kafka/server/ReplicaFetcherThread.scala | 1 + .../scala/kafka/server/ReplicaManager.scala | 2 +- .../checkpoints/OffsetCheckpointFile.scala | 6 +- .../scala/kafka/tools/DumpLogSegments.scala | 4 +- .../scala/unit/kafka/log/LogManagerTest.scala | 40 ++-- .../src/test/scala/unit/kafka/log/LogTest.scala | 200 +++++++++++++----- .../kafka/log/ProducerStateManagerTest.scala | 2 +- .../server/AbstractFetcherThreadTest.scala | 1 + .../server/HighwatermarkPersistenceTest.scala | 4 +- .../unit/kafka/server/ReplicaManagerTest.scala | 10 +- .../unit/kafka/server/ServerShutdownTest.scala | 3 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 6 +- 20 files changed, 400 insertions(+), 251 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 4c7c227..1670fd0 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -21,6 +21,7 @@ import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, OffsetRequest import kafka.cluster.BrokerEndPoint import kafka.message.ByteBufferMessageSet import kafka.server.{AbstractFetcherThread, PartitionFetchState} +import AbstractFetcherThread.ResultWithPartitions import kafka.common.{ErrorMapping, TopicAndPartition} import scala.collection.Map http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index d397ca6..a6f76ab 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -192,14 +192,14 @@ class Log(@volatile var dir: File, locally { val startMs = time.milliseconds - loadSegments() + val nextOffset = loadSegments() /* Calculate the offset of the next message */ - nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset, activeSegment.baseOffset, activeSegment.size) + nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size) leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset) - logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset) + logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset) // The earliest leader epoch may not be flushed during a hard failure. Recover it here. leaderEpochCache.clearAndFlushEarliest(logStartOffset) @@ -256,10 +256,10 @@ class Log(@volatile var dir: File, var swapFiles = Set[File]() for (file <- dir.listFiles if file.isFile) { - if(!file.canRead) + if (!file.canRead) throw new IOException("Could not read file " + file) val filename = file.getName - if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) { + if (filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) { // if the file ends in .deleted or .cleaned, delete it Files.deleteIfExists(file.toPath) } else if(filename.endsWith(SwapFileSuffix)) { @@ -271,7 +271,7 @@ class Log(@volatile var dir: File, Files.deleteIfExists(file.toPath) } else if (isLogFile(baseFile)) { // delete the index files - val offset = offsetFromFilename(baseFile.getName) + val offset = offsetFromFile(baseFile) Files.deleteIfExists(Log.offsetIndexFile(dir, offset).toPath) Files.deleteIfExists(Log.timeIndexFile(dir, offset).toPath) Files.deleteIfExists(Log.transactionIndexFile(dir, offset).toPath) @@ -287,10 +287,9 @@ class Log(@volatile var dir: File, // load segments in ascending order because transactional data from one segment may depend on the // segments that come before it for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) { - val filename = file.getName if (isIndexFile(file)) { // if it is an index file, make sure it has a corresponding .log file - val offset = offsetFromFilename(filename) + val offset = offsetFromFile(file) val logFile = Log.logFile(dir, offset) if (!logFile.exists) { warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) @@ -298,7 +297,7 @@ class Log(@volatile var dir: File, } } else if (isLogFile(file)) { // if it's a log file, load the corresponding log segment - val startOffset = offsetFromFilename(filename) + val startOffset = offsetFromFile(file) val indexFile = Log.offsetIndexFile(dir, startOffset) val timeIndexFile = Log.timeIndexFile(dir, startOffset) val txnIndexFile = Log.transactionIndexFile(dir, startOffset) @@ -334,7 +333,7 @@ class Log(@volatile var dir: File, error("Could not find offset index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) recoverSegment(segment) } - segments.put(startOffset, segment) + addSegment(segment) } } } @@ -349,6 +348,11 @@ class Log(@volatile var dir: File, loadProducersFromLog(stateManager, fetchDataInfo.records) } stateManager.updateMapEndOffset(segment.baseOffset) + + // take a snapshot for the first recovered segment to avoid reloading all the segments if we shutdown before we + // checkpoint the recovery point + stateManager.takeSnapshot() + val bytesTruncated = segment.recover(stateManager, leaderEpochCache) // once we have recovered the segment's data, take a snapshot to ensure that we won't @@ -361,8 +365,7 @@ class Log(@volatile var dir: File, private def completeSwapOperations(swapFiles: Set[File]): Unit = { for (swapFile <- swapFiles) { val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) - val filename = logFile.getName - val startOffset = offsetFromFilename(filename) + val startOffset = offsetFromFile(logFile) val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix) val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix) @@ -384,9 +387,9 @@ class Log(@volatile var dir: File, } } - // Load the log segments from the log files on disk + // Load the log segments from the log files on disk and return the next offset // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded - private def loadSegments() { + private def loadSegments(): Long = { // first do a pass through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles() @@ -399,59 +402,65 @@ class Log(@volatile var dir: File, // before the swap file is restored as the new segment file. completeSwapOperations(swapFiles) - if(logSegments.isEmpty) { + if (logSegments.isEmpty) { // no existing segments, create a new mutable segment beginning at offset 0 - segments.put(0L, new LogSegment(dir = dir, - startOffset = 0, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - rollJitterMs = config.randomSegmentJitter, - time = time, - fileAlreadyExists = false, - initFileSize = this.initFileSize, - preallocate = config.preallocate)) + addSegment(new LogSegment(dir = dir, + startOffset = 0, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, + time = time, + fileAlreadyExists = false, + initFileSize = this.initFileSize, + preallocate = config.preallocate)) + 0 } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { - recoverLog() + val nextOffset = recoverLog() // reset the index size of the currently active log segment to allow more entries activeSegment.index.resize(config.maxIndexSize) activeSegment.timeIndex.resize(config.maxIndexSize) - } + nextOffset + } else 0 } private def updateLogEndOffset(messageOffset: Long) { nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size) } - // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded - private def recoverLog() { + /** + * Recover the log segments and return the next offset after recovery. + * + * This method does not need to convert IOException to KafkaStorageException because it is only called before all + * logs are loaded. + */ + private def recoverLog(): Long = { // if we have the clean shutdown marker, skip recovery - if(hasCleanShutdownFile) { - this.recoveryPoint = activeSegment.nextOffset() - return - } - - // okay we need to actually recovery this log - val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator - while(unflushed.hasNext) { - val segment = unflushed.next - info("Recovering unflushed segment %d in log %s.".format(segment.baseOffset, name)) - val truncatedBytes = - try { - recoverSegment(segment, Some(leaderEpochCache)) - } catch { - case _: InvalidOffsetException => - val startOffset = segment.baseOffset - warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " + - "creating an empty one with starting offset " + startOffset) - segment.truncateTo(startOffset) + if (!hasCleanShutdownFile) { + // okay we need to actually recovery this log + val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator + while (unflushed.hasNext) { + val segment = unflushed.next + info("Recovering unflushed segment %d in log %s.".format(segment.baseOffset, name)) + val truncatedBytes = + try { + recoverSegment(segment, Some(leaderEpochCache)) + } catch { + case _: InvalidOffsetException => + val startOffset = segment.baseOffset + warn("Found invalid offset during recovery for log " + dir.getName + ". Deleting the corrupt segment and " + + "creating an empty one with starting offset " + startOffset) + segment.truncateTo(startOffset) + } + if (truncatedBytes > 0) { + // we had an invalid message, delete all remaining log + warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(segment.baseOffset, name, + segment.nextOffset())) + unflushed.foreach(deleteSegment) } - if(truncatedBytes > 0) { - // we had an invalid message, delete all remaining log - warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(segment.baseOffset, name, - segment.nextOffset())) - unflushed.foreach(deleteSegment) } } + recoveryPoint = activeSegment.nextOffset + recoveryPoint } private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized { @@ -476,7 +485,7 @@ class Log(@volatile var dir: File, // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the // last two segments and the last offset. This should avoid the full scan in the case that the log needs // truncation. - val nextLatestSegmentBaseOffset = Option(segments.lowerEntry(activeSegment.baseOffset)).map(_.getValue.baseOffset) + val nextLatestSegmentBaseOffset = lowerSegment(activeSegment.baseOffset).map(_.baseOffset) val offsetsToSnapshot = Seq(nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset)) offsetsToSnapshot.flatten.foreach { offset => producerStateManager.updateMapEndOffset(offset) @@ -531,7 +540,7 @@ class Log(@volatile var dir: File, /** * Check if we have the "clean shutdown" file */ - private def hasCleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile).exists() + private def hasCleanShutdownFile: Boolean = new File(dir.getParentFile, CleanShutdownFile).exists() /** * The number of segments in the log. @@ -1281,15 +1290,12 @@ class Log(@volatile var dir: File, file.delete() } - segments.lastEntry() match { - case null => - case entry => { - val seg = entry.getValue - seg.onBecomeInactiveSegment() - seg.index.trimToValidSize() - seg.timeIndex.trimToValidSize() - seg.log.trim() - } + Option(segments.lastEntry).foreach { entry => + val seg = entry.getValue + seg.onBecomeInactiveSegment() + seg.index.trimToValidSize() + seg.timeIndex.trimToValidSize() + seg.log.trim() } // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot @@ -1349,11 +1355,6 @@ class Log(@volatile var dir: File, for (segment <- logSegments(this.recoveryPoint, offset)) segment.flush() - // now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain - // the snapshots from the recent segments in case we need to truncate and rebuild the producer state. - // Otherwise, we would always need to rebuild from the earliest segment. - producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset)) - lock synchronized { if (offset > this.recoveryPoint) { this.recoveryPoint = offset @@ -1363,17 +1364,42 @@ class Log(@volatile var dir: File, } } - def minSnapshotOffsetToRetain(flushedOffset: Long) = { - // always retain the producer snapshot from the last two segments. This solves the common case - // of truncating to an offset within the active segment, and the rarer case of truncating to the - // previous segment just after rolling the new segment. - var minSnapshotOffset = activeSegment.baseOffset - val previousSegment = segments.lowerEntry(activeSegment.baseOffset) - if (previousSegment != null) - minSnapshotOffset = previousSegment.getValue.baseOffset - math.min(flushedOffset, minSnapshotOffset) + /** + * Cleanup old producer snapshots after the recovery point is checkpointed. It is useful to retain + * the snapshots from the recent segments in case we need to truncate and rebuild the producer state. + * Otherwise, we would always need to rebuild from the earliest segment. + * + * More specifically: + * + * 1. We always retain the producer snapshot from the last two segments. This solves the common case + * of truncating to an offset within the active segment, and the rarer case of truncating to the previous segment. + * + * 2. We only delete snapshots for offsets less than the recovery point. The recovery point is checkpointed + * periodically and it can be behind after a hard shutdown. Since recovery starts from the recovery point, the logic + * of rebuilding the producer snapshots in one pass and without loading older segments is simpler if we always + * have a producer snapshot for all segments being recovered. + * + * Return the minimum snapshots offset that was retained. + */ + def deleteSnapshotsAfterRecoveryPointCheckpoint(): Long = { + val minOffsetToRetain = minSnapshotsOffsetToRetain + producerStateManager.deleteSnapshotsBefore(minOffsetToRetain) + minOffsetToRetain + } + + // Visible for testing, see `deleteSnapshotsAfterRecoveryPointCheckpoint()` for details + private[log] def minSnapshotsOffsetToRetain: Long = { + lock synchronized { + val twoSegmentsMinOffset = lowerSegment(activeSegment.baseOffset).getOrElse(activeSegment).baseOffset + // Prefer segment base offset + val recoveryPointOffset = lowerSegment(recoveryPoint).map(_.baseOffset).getOrElse(recoveryPoint) + math.min(recoveryPointOffset, twoSegmentsMinOffset) + } } + private def lowerSegment(offset: Long): Option[LogSegment] = + Option(segments.lowerEntry(offset)).map(_.getValue) + /** * Completely delete this log directory and all contents from the file system with no delay */ @@ -1478,7 +1504,7 @@ class Log(@volatile var dir: File, /** * The time this log is last known to have been fully flushed to disk */ - def lastFlushTime(): Long = lastflushedTime.get + def lastFlushTime: Long = lastflushedTime.get /** * The active segment that is currently taking appends @@ -1497,7 +1523,7 @@ class Log(@volatile var dir: File, def logSegments(from: Long, to: Long): Iterable[LogSegment] = { lock synchronized { val floor = segments.floorKey(from) - if(floor eq null) + if (floor eq null) segments.headMap(to).values.asScala else segments.subMap(floor, true, to, false).values.asScala @@ -1638,7 +1664,7 @@ object Log { /** a time index file */ val TimeIndexFileSuffix = ".timeindex" - val PidSnapshotFileSuffix = ".snapshot" + val ProducerSnapshotFileSuffix = ".snapshot" /** an (aborted) txn index */ val TxnIndexFileSuffix = ".txnindex" @@ -1704,7 +1730,7 @@ object Log { * @param dir The directory in which the log will reside * @param offset The base offset of the log file */ - def logFile(dir: File, offset: Long) = + def logFile(dir: File, offset: Long): File = new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix) /** @@ -1722,7 +1748,7 @@ object Log { * @param dir The directory in which the log will reside * @param offset The base offset of the log file */ - def offsetIndexFile(dir: File, offset: Long) = + def offsetIndexFile(dir: File, offset: Long): File = new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix) /** @@ -1731,7 +1757,7 @@ object Log { * @param dir The directory in which the log will reside * @param offset The base offset of the log file */ - def timeIndexFile(dir: File, offset: Long) = + def timeIndexFile(dir: File, offset: Long): File = new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix) /** @@ -1740,14 +1766,16 @@ object Log { * @param dir The directory in which the log will reside * @param offset The last offset (exclusive) included in the snapshot */ - def producerSnapshotFile(dir: File, offset: Long) = - new File(dir, filenamePrefixFromOffset(offset) + PidSnapshotFileSuffix) + def producerSnapshotFile(dir: File, offset: Long): File = + new File(dir, filenamePrefixFromOffset(offset) + ProducerSnapshotFileSuffix) - def transactionIndexFile(dir: File, offset: Long) = + def transactionIndexFile(dir: File, offset: Long): File = new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix) - def offsetFromFilename(filename: String): Long = + def offsetFromFile(file: File): Long = { + val filename = file.getName filename.substring(0, filename.indexOf('.')).toLong + } /** * Calculate a log's size (in bytes) based on its log segments http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index d45984b..217c49e 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -87,7 +87,7 @@ import scala.collection.JavaConverters._ * @param time A way to control the passage of time */ class LogCleaner(val config: CleanerConfig, - val logDirs: Array[File], + val logDirs: Seq[File], val logs: Pool[TopicPartition, Log], val logDirFailureChannel: LogDirFailureChannel, time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup { http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/main/scala/kafka/log/LogCleanerManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index af8707c..e8fe093 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -47,7 +47,7 @@ private[log] case object LogCleaningPaused extends LogCleaningState * While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is * requested to be resumed. */ -private[log] class LogCleanerManager(val logDirs: Array[File], +private[log] class LogCleanerManager(val logDirs: Seq[File], val logs: Pool[TopicPartition, Log], val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { @@ -59,7 +59,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint" /* the offset checkpoints holding the last cleaned point for each log */ - @volatile private var checkpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile), logDirFailureChannel))).toMap + @volatile private var checkpoints = logDirs.map(dir => + (dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile), logDirFailureChannel))).toMap /* the set of logs currently being cleaned */ private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]() @@ -88,7 +89,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], checkpoint.read() } catch { case e: KafkaStorageException => - error(s"Failed to access checkpoint file ${checkpoint.f.getName} in dir ${checkpoint.f.getParentFile.getAbsolutePath}", e) + error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e) Map.empty[TopicPartition, Long] } }).toMap @@ -239,7 +240,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], checkpoint.write(existing) } catch { case e: KafkaStorageException => - error(s"Failed to access checkpoint file ${checkpoint.f.getName} in dir ${checkpoint.f.getParentFile.getAbsolutePath}", e) + error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index f4bd8a2..ea03fa8 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -47,8 +47,8 @@ import scala.collection.mutable.ArrayBuffer * A background thread handles log retention by periodically truncating excess log segments. */ @threadsafe -class LogManager(logDirs: Array[File], - initialOfflineDirs: Array[File], +class LogManager(logDirs: Seq[File], + initialOfflineDirs: Seq[File], val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig, @@ -63,10 +63,11 @@ class LogManager(logDirs: Array[File], brokerTopicStats: BrokerTopicStats, logDirFailureChannel: LogDirFailureChannel, time: Time) extends Logging with KafkaMetricsGroup { - val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" - val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint" + + import LogManager._ + val LockFile = ".lock" - val InitialTaskDelayMs = 30*1000 + val InitialTaskDelayMs = 30 * 1000 private val logCreationOrDeletionLock = new Object private val logs = new Pool[TopicPartition, Log]() @@ -74,11 +75,11 @@ class LogManager(logDirs: Array[File], private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs) - def liveLogDirs: Array[File] = { + def liveLogDirs: Seq[File] = { if (_liveLogDirs.size == logDirs.size) logDirs else - _liveLogDirs.asScala.toArray + _liveLogDirs.asScala.toBuffer } private val dirLocks = lockLogDirs(liveLogDirs) @@ -87,9 +88,14 @@ class LogManager(logDirs: Array[File], @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap - private def offlineLogDirs = logDirs.filterNot(_liveLogDirs.contains) private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]() + private def offlineLogDirs: Iterable[File] = { + val logDirsSet = mutable.Set[File](logDirs: _*) + _liveLogDirs.asScala.foreach(logDirsSet -=) + logDirsSet + } + loadLogs() @@ -103,7 +109,7 @@ class LogManager(logDirs: Array[File], val offlineLogDirectoryCount = newGauge( "OfflineLogDirectoryCount", new Gauge[Int] { - def value = offlineLogDirs.length + def value = offlineLogDirs.size } ) @@ -165,20 +171,22 @@ class LogManager(logDirs: Array[File], Exit.halt(1) } - recoveryPointCheckpoints = recoveryPointCheckpoints.filterKeys(file => file.getAbsolutePath != dir) - logStartOffsetCheckpoints = logStartOffsetCheckpoints.filterKeys(file => file.getAbsolutePath != dir) + recoveryPointCheckpoints = recoveryPointCheckpoints.filter { case (file, _) => file.getAbsolutePath != dir } + logStartOffsetCheckpoints = logStartOffsetCheckpoints.filter { case (file, _) => file.getAbsolutePath != dir } if (cleaner != null) cleaner.handleLogDirFailure(dir) - val offlineTopicPartitions = logs.filter { case (tp, log) => log.dir.getParent == dir}.map { case (tp, log) => tp } + val offlineTopicPartitions = logs.collect { + case (tp, log) if log.dir.getParent == dir => tp + } - offlineTopicPartitions.foreach(topicPartition => { + offlineTopicPartitions.foreach { topicPartition => val removedLog = logs.remove(topicPartition) if (removedLog != null) { removedLog.closeHandlers() removedLog.removeLogMetrics() } - }) + } info(s"Partitions ${offlineTopicPartitions.mkString(",")} are offline due to failure on log directory $dir") dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy())) } @@ -203,7 +211,7 @@ class LogManager(logDirs: Array[File], } } - private def loadLogs(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = { + private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = { debug("Loading log '" + logDir.getName + "'") val topicPartition = Log.parseTopicPartitionName(logDir) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) @@ -252,10 +260,7 @@ class LogManager(logDirs: Array[File], val cleanShutdownFile = new File(dir, Log.CleanShutdownFile) if (cleanShutdownFile.exists) { - debug( - "Found clean shutdown file. " + - "Skipping recovery for all logs in data directory: " + - dir.getAbsolutePath) + debug(s"Found clean shutdown file. Skipping recovery for all logs in data directory: ${dir.getAbsolutePath}") } else { // log recovery itself is being performed by `Log` class during initialization brokerState.newState(RecoveringFromUncleanShutdown) @@ -284,7 +289,7 @@ class LogManager(logDirs: Array[File], } yield { CoreUtils.runnable { try { - loadLogs(logDir, recoveryPoints, logStartOffsets) + loadLog(logDir, recoveryPoints, logStartOffsets) } catch { case e: IOException => offlineDirs.append((dir.getAbsolutePath, e)) @@ -315,10 +320,9 @@ class LogManager(logDirs: Array[File], logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e) } } catch { - case e: ExecutionException => { + case e: ExecutionException => error("There was an error in one of the threads during logs loading: " + e.getCause) throw e.getCause - } } finally { threadPools.foreach(_.shutdown()) } @@ -331,7 +335,7 @@ class LogManager(logDirs: Array[File], */ def startup() { /* Schedule the cleanup task to delete old logs */ - if(scheduler != null) { + if (scheduler != null) { info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs)) scheduler.schedule("kafka-log-retention", cleanupLogs _, @@ -360,7 +364,7 @@ class LogManager(logDirs: Array[File], period = defaultConfig.fileDeleteDelayMs, TimeUnit.MILLISECONDS) } - if(cleanerConfig.enableCleaner) + if (cleanerConfig.enableCleaner) cleaner.startup() } @@ -503,13 +507,17 @@ class LogManager(logDirs: Array[File], * Make a checkpoint for all logs in provided directory. */ private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = { - val recoveryPoints = this.logsByDir.get(dir.toString) - if (recoveryPoints.isDefined) { + for { + partitionToLog <- logsByDir.get(dir.getAbsolutePath) + checkpoint <- recoveryPointCheckpoints.get(dir) + } { try { - this.recoveryPointCheckpoints.get(dir).foreach(_.write(recoveryPoints.get.mapValues(_.recoveryPoint))) + checkpoint.write(partitionToLog.mapValues(_.recoveryPoint)) + logs.values.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint()) } catch { case e: IOException => - logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point file in directory $dir", e) + logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " + + s"file in directory $dir", e) } } } @@ -518,12 +526,15 @@ class LogManager(logDirs: Array[File], * Checkpoint log start offset for all logs in provided directory. */ private def checkpointLogStartOffsetsInDir(dir: File): Unit = { - val logs = this.logsByDir.get(dir.getAbsolutePath) - if (logs.isDefined) { + for { + partitionToLog <- logsByDir.get(dir.getAbsolutePath) + checkpoint <- logStartOffsetCheckpoints.get(dir) + } { try { - this.logStartOffsetCheckpoints.get(dir).foreach(_.write( - logs.get.filter { case (tp, log) => log.logStartOffset > log.logSegments.head.baseOffset }.mapValues(_.logStartOffset) - )) + val logStartOffsets = partitionToLog.filter { case (_, log) => + log.logStartOffset > log.logSegments.head.baseOffset + }.mapValues(_.logStartOffset) + checkpoint.write(logStartOffsets) } catch { case e: IOException => logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to logStartOffset file in directory $dir", e) @@ -710,7 +721,7 @@ class LogManager(logDirs: Array[File], /** * Get all the partition logs */ - def allLogs(): Iterable[Log] = logs.values + def allLogs: Iterable[Log] = logs.values /** * Get a map of TopicPartition => Log @@ -720,10 +731,8 @@ class LogManager(logDirs: Array[File], /** * Map of log dir to logs by topic and partitions in that dir */ - private def logsByDir = { - this.logsByTopicPartition.groupBy { - case (_, log) => log.dir.getParent - } + private def logsByDir: Map[String, Map[TopicPartition, Log]] = { + this.logsByTopicPartition.groupBy { case (_, log) => log.dir.getParent } } // logDir should be an absolute path @@ -738,7 +747,7 @@ class LogManager(logDirs: Array[File], /** * Flush any log which has exceeded its flush interval and has unwritten messages. */ - private def flushDirtyLogs() = { + private def flushDirtyLogs(): Unit = { debug("Checking for dirty logs to flush...") for ((topicPartition, log) <- logs) { @@ -758,6 +767,8 @@ class LogManager(logDirs: Array[File], object LogManager { + val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" + val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint" val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000 def apply(config: KafkaConfig, @@ -785,8 +796,8 @@ object LogManager { backOffMs = config.logCleanerBackoffMs, enableCleaner = config.logCleanerEnable) - new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile).toArray, - initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile).toArray, + new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile), + initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile), topicConfigs = topicConfigs, defaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/main/scala/kafka/log/ProducerStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 7c0a3da..63c1f56 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import java.nio.file.Files import kafka.common.KafkaException -import kafka.log.Log.offsetFromFilename +import kafka.log.Log.offsetFromFile import kafka.server.LogOffsetMetadata import kafka.utils.{Logging, nonthreadsafe, threadsafe} import org.apache.kafka.common.TopicPartition @@ -417,7 +417,25 @@ object ProducerStateManager { } } - private def isSnapshotFile(name: String): Boolean = name.endsWith(Log.PidSnapshotFileSuffix) + private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(Log.ProducerSnapshotFileSuffix) + + // visible for testing + private[log] def listSnapshotFiles(dir: File): Seq[File] = { + if (dir.exists && dir.isDirectory) { + Option(dir.listFiles).map { files => + files.filter(f => f.isFile && isSnapshotFile(f)).toSeq + }.getOrElse(Seq.empty) + } else Seq.empty + } + + // visible for testing + private[log] def deleteSnapshotsBefore(dir: File, offset: Long): Unit = deleteSnapshotFiles(dir, _ < offset) + + private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => true) { + listSnapshotFiles(dir).filter(file => predicate(offsetFromFile(file))).foreach { file => + Files.deleteIfExists(file.toPath) + } + } } @@ -444,7 +462,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, import ProducerStateManager._ import java.util - private val validateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME private val producers = mutable.Map.empty[Long, ProducerIdEntry] private var lastMapOffset = 0L private var lastSnapOffset = 0L @@ -509,7 +526,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, isProducerRetained(producerEntry, logStartOffset) && !isProducerExpired(currentTime, producerEntry) } loadedProducers.foreach(loadProducerEntry) - lastSnapOffset = offsetFromFilename(file.getName) + lastSnapOffset = offsetFromFile(file) lastMapOffset = lastSnapOffset return } catch { @@ -553,9 +570,9 @@ class ProducerStateManager(val topicPartition: TopicPartition, */ def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) { // remove all out of range snapshots - deleteSnapshotFiles { snapOffset => + deleteSnapshotFiles(logDir, { snapOffset => snapOffset > logEndOffset || snapOffset <= logStartOffset - } + }) if (logEndOffset != mapEndOffset) { producers.clear() @@ -625,12 +642,12 @@ class ProducerStateManager(val topicPartition: TopicPartition, /** * Get the last offset (exclusive) of the latest snapshot file. */ - def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFilename(file.getName)) + def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFile(file)) /** * Get the last offset (exclusive) of the oldest snapshot file. */ - def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFilename(file.getName)) + def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file)) private def isProducerRetained(producerIdEntry: ProducerIdEntry, logStartOffset: Long): Boolean = { producerIdEntry.removeBatchesOlderThan(logStartOffset) @@ -689,7 +706,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, producers.clear() ongoingTxns.clear() unreplicatedTxns.clear() - deleteSnapshotFiles() + deleteSnapshotFiles(logDir) lastSnapOffset = 0L lastMapOffset = 0L } @@ -710,25 +727,12 @@ class ProducerStateManager(val topicPartition: TopicPartition, } @threadsafe - def deleteSnapshotsBefore(offset: Long): Unit = { - deleteSnapshotFiles(_ < offset) - } - - private def listSnapshotFiles: List[File] = { - if (logDir.exists && logDir.isDirectory) { - val files = logDir.listFiles - if (files != null) - files.filter(f => f.isFile && isSnapshotFile(f.getName)).toList - else - List.empty[File] - } else - List.empty[File] - } + def deleteSnapshotsBefore(offset: Long): Unit = ProducerStateManager.deleteSnapshotsBefore(logDir, offset) private def oldestSnapshotFile: Option[File] = { val files = listSnapshotFiles if (files.nonEmpty) - Some(files.minBy(file => offsetFromFilename(file.getName))) + Some(files.minBy(offsetFromFile)) else None } @@ -736,14 +740,11 @@ class ProducerStateManager(val topicPartition: TopicPartition, private def latestSnapshotFile: Option[File] = { val files = listSnapshotFiles if (files.nonEmpty) - Some(files.maxBy(file => offsetFromFilename(file.getName))) + Some(files.maxBy(offsetFromFile)) else None } - private def deleteSnapshotFiles(predicate: Long => Boolean = _ => true) { - listSnapshotFiles.filter(file => predicate(offsetFromFilename(file.getName))) - .foreach(file => Files.deleteIfExists(file.toPath)) - } + private def listSnapshotFiles: Seq[File] = ProducerStateManager.listSnapshotFiles(logDir) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index e772ac3..d6b9f1b 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -82,8 +82,6 @@ abstract class AbstractFetcherThread(name: String, protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[REQ] - case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition]) - protected def fetch(fetchRequest: REQ): Seq[(TopicPartition, PD)] override def shutdown(){ @@ -303,6 +301,8 @@ abstract class AbstractFetcherThread(name: String, object AbstractFetcherThread { + case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition]) + trait FetchRequest { def isEmpty: Boolean def offset(topicPartition: TopicPartition): Long http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index aa00565..8eceaa0 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2000,7 +2000,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorize(request.session, Describe, Resource.ClusterResource)) { val partitions = if (describeLogDirsDirRequest.isAllTopicPartitions) - replicaManager.logManager.allLogs().map(_.topicPartition).toSet + replicaManager.logManager.allLogs.map(_.topicPartition).toSet else describeLogDirsDirRequest.topicPartitions().asScala http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index b90e9e8..850e882 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -19,6 +19,7 @@ package kafka.server import java.util +import AbstractFetcherThread.ResultWithPartitions import kafka.admin.AdminUtils import kafka.api.{FetchRequest => _, _} import kafka.cluster.{BrokerEndPoint, Replica} http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 98a4be1..51d845e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -607,7 +607,7 @@ class ReplicaManager(val config: KafkaConfig, * are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented. */ def describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo] = { - val logsByDir = logManager.allLogs().groupBy(log => log.dir.getParent) + val logsByDir = logManager.allLogs.groupBy(log => log.dir.getParent) config.logDirs.toSet.map { logDir: String => val absolutePath = new File(logDir).getAbsolutePath http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala index 9cd0963..2769cb4 100644 --- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala @@ -52,9 +52,9 @@ trait OffsetCheckpoint { /** * This class persists a map of (Partition => Offsets) to a file (for a certain replica) */ -class OffsetCheckpointFile(val f: File, logDirFailureChannel: LogDirFailureChannel = null) { - val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointFile.CurrentVersion, - OffsetCheckpointFile.Formatter, logDirFailureChannel, f.getParent) +class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) { + val checkpoint = new CheckpointFile[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion, + OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent) def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets.toSeq) http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index a8a2ba7..f0ea50c 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -114,7 +114,7 @@ object DumpLogSegments { dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize) case Log.TimeIndexFileSuffix => dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize) - case Log.PidSnapshotFileSuffix => + case Log.ProducerSnapshotFileSuffix => dumpProducerIdSnapshot(file) case Log.TxnIndexFileSuffix => dumpTxnIndex(file) @@ -145,7 +145,7 @@ object DumpLogSegments { } private def dumpTxnIndex(file: File): Unit = { - val index = new TransactionIndex(Log.offsetFromFilename(file.getName), file) + val index = new TransactionIndex(Log.offsetFromFile(file), file) for (abortedTxn <- index.allAbortedTxns) { println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " + s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}") http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/test/scala/unit/kafka/log/LogManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 9794b1a..6544d43 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -31,9 +31,9 @@ import org.junit.{After, Before, Test} class LogManagerTest { - val time: MockTime = new MockTime() + val time = new MockTime() val maxRollInterval = 100 - val maxLogAgeMs = 10*60*1000 + val maxLogAgeMs = 10 * 60 * 1000 val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer) @@ -50,7 +50,6 @@ class LogManagerTest { logDir = TestUtils.tempDir() logManager = createLogManager() logManager.startup() - logDir = logManager.liveLogDirs(0) } @After @@ -58,6 +57,7 @@ class LogManagerTest { if (logManager != null) logManager.shutdown() Utils.delete(logDir) + // Some tests assign a new LogManager logManager.liveLogDirs.foreach(Utils.delete) } @@ -154,7 +154,7 @@ class LogManagerTest { time.sleep(log.config.fileDeleteDelayMs + 1) // there should be a log file, two indexes (the txn index is created lazily), - // the leader epoch checkpoint and two pid mapping files (one for the active and previous segments) + // the leader epoch checkpoint and two producer snapshot files (one for the active and previous segments) assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 3, log.dir.list.length) assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset + 1, 1024).records.sizeInBytes) try { @@ -220,7 +220,7 @@ class LogManagerTest { @Test def testLeastLoadedAssignment() { // create a log manager with multiple data directories - val dirs = Array(TestUtils.tempDir(), + val dirs = Seq(TestUtils.tempDir(), TestUtils.tempDir(), TestUtils.tempDir()) logManager.shutdown() @@ -253,7 +253,7 @@ class LogManagerTest { */ @Test def testCheckpointRecoveryPoints() { - verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1), new TopicPartition("test-b", 1)), logManager) + verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1), new TopicPartition("test-b", 1)), logManager, logDir) } /** @@ -262,11 +262,9 @@ class LogManagerTest { @Test def testRecoveryDirectoryMappingWithTrailingSlash() { logManager.shutdown() - logDir = TestUtils.tempDir() - logManager = TestUtils.createLogManager( - logDirs = Array(new File(logDir.getAbsolutePath + File.separator))) + logManager = TestUtils.createLogManager(logDirs = Seq(new File(TestUtils.tempDir().getAbsolutePath + File.separator))) logManager.startup() - verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager) + verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager, logManager.liveLogDirs.head) } /** @@ -275,34 +273,30 @@ class LogManagerTest { @Test def testRecoveryDirectoryMappingWithRelativeDirectory() { logManager.shutdown() - logDir = new File("data" + File.separator + logDir.getName).getAbsoluteFile - logDir.mkdirs() - logDir.deleteOnExit() - logManager = createLogManager() + logManager = createLogManager(Seq(new File("data", logDir.getName).getAbsoluteFile)) logManager.startup() - verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager) + verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager, logManager.liveLogDirs.head) } - - private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition], - logManager: LogManager) { - val logs = topicPartitions.map(this.logManager.getOrCreateLog(_, logConfig)) - logs.foreach(log => { + private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition], logManager: LogManager, logDir: File) { + val logs = topicPartitions.map(logManager.getOrCreateLog(_, logConfig)) + logs.foreach { log => for (_ <- 0 until 50) log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0) log.flush() - }) + } logManager.checkpointLogRecoveryOffsets() - val checkpoints = new OffsetCheckpointFile(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() + val checkpoints = new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile)).read() topicPartitions.zip(logs).foreach { case (tp, log) => assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint) + assertEquals(Some(log.minSnapshotsOffsetToRetain), log.oldestProducerSnapshotOffset) } } - private def createLogManager(logDirs: Array[File] = Array(this.logDir)): LogManager = { + private def createLogManager(logDirs: Seq[File] = Seq(this.logDir)): LogManager = { TestUtils.createLogManager( defaultConfig = logConfig, logDirs = logDirs, http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 6d40967..1d0fd15 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -19,7 +19,6 @@ package kafka.log import java.io._ import java.nio.ByteBuffer -import java.nio.file.Files import java.util.Properties import org.apache.kafka.common.errors._ @@ -27,8 +26,8 @@ import kafka.common.KafkaException import org.junit.Assert._ import org.junit.{After, Before, Test} import kafka.utils._ -import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel} -import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} +import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel} +import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention @@ -39,7 +38,7 @@ import org.apache.kafka.common.utils.{Time, Utils} import org.easymock.EasyMock import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} class LogTest { @@ -69,20 +68,20 @@ class LogTest { } @Test - def testOffsetFromFilename() { + def testOffsetFromFile() { val offset = 23423423L val logFile = Log.logFile(tmpDir, offset) - assertEquals(offset, Log.offsetFromFilename(logFile.getName)) + assertEquals(offset, Log.offsetFromFile(logFile)) val offsetIndexFile = Log.offsetIndexFile(tmpDir, offset) - assertEquals(offset, Log.offsetFromFilename(offsetIndexFile.getName)) + assertEquals(offset, Log.offsetFromFile(offsetIndexFile)) val timeIndexFile = Log.timeIndexFile(tmpDir, offset) - assertEquals(offset, Log.offsetFromFilename(timeIndexFile.getName)) + assertEquals(offset, Log.offsetFromFile(timeIndexFile)) val snapshotFile = Log.producerSnapshotFile(tmpDir, offset) - assertEquals(offset, Log.offsetFromFilename(snapshotFile.getName)) + assertEquals(offset, Log.offsetFromFile(snapshotFile)) } /** @@ -156,26 +155,131 @@ class LogTest { // simulate the upgrade path by creating a new log with several segments, deleting the // snapshot files, and then reloading the log val logConfig = createLogConfig(segmentBytes = 64 * 10) - val log = createLog(logDir, logConfig) + var log = createLog(logDir, logConfig) assertEquals(None, log.oldestProducerSnapshotOffset) for (i <- 0 to 100) { val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) } - assertTrue(log.logSegments.size >= 2) + val logEndOffset = log.logEndOffset + log.close() + + val cleanShutdownFile = createCleanShutdownFile() + deleteProducerSnapshotFiles() + + // Reload after clean shutdown + log = createLog(logDir, logConfig, recoveryPoint = logEndOffset) + var expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).takeRight(2).toVector :+ log.logEndOffset + assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) + log.close() + + Utils.delete(cleanShutdownFile) + deleteProducerSnapshotFiles() + + // Reload after unclean shutdown with recoveryPoint set to log end offset + log = createLog(logDir, logConfig, recoveryPoint = logEndOffset) + // Note that we don't maintain the guarantee of having a snapshot for the 2 most recent segments in this case + expectedSnapshotOffsets = Vector(log.logSegments.last.baseOffset, log.logEndOffset) + assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) log.close() - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file => - Files.delete(file.toPath) + deleteProducerSnapshotFiles() + + // Reload after unclean shutdown with recoveryPoint set to 0 + log = createLog(logDir, logConfig, recoveryPoint = 0L) + // Is this working as intended? + expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).tail.toVector :+ log.logEndOffset + assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) + log.close() + } + + @Test + def testProducerSnapshotsRecoveryAfterUncleanShutdown(): Unit = { + val logConfig = createLogConfig(segmentBytes = 64 * 10) + var log = createLog(logDir, logConfig) + assertEquals(None, log.oldestProducerSnapshotOffset) + + for (i <- 0 to 100) { + val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) + log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) } - val reloadedLog = createLog(logDir, logConfig) - val expectedSnapshotsOffsets = log.logSegments.toSeq.reverse.take(2).map(_.baseOffset) ++ Seq(reloadedLog.logEndOffset) - expectedSnapshotsOffsets.foreach { offset => - assertTrue(Log.producerSnapshotFile(logDir, offset).exists) + assertTrue(log.logSegments.size >= 5) + val segmentOffsets = log.logSegments.toVector.map(_.baseOffset) + val activeSegmentOffset = segmentOffsets.last + + // We want the recovery point to be past the segment offset and before the last 2 segments including a gap of + // 1 segment. We collect the data before closing the log. + val offsetForSegmentAfterRecoveryPoint = segmentOffsets(segmentOffsets.size - 3) + val offsetForRecoveryPointSegment = segmentOffsets(segmentOffsets.size - 4) + val (segOffsetsBeforeRecovery, segOffsetsAfterRecovery) = segmentOffsets.partition(_ < offsetForRecoveryPointSegment) + val recoveryPoint = offsetForRecoveryPointSegment + 1 + assertTrue(recoveryPoint < offsetForSegmentAfterRecoveryPoint) + log.close() + + val segmentsWithReads = ArrayBuffer[LogSegment]() + val recoveredSegments = ArrayBuffer[LogSegment]() + + def createLogWithInterceptedReads(recoveryPoint: Long) = { + val maxProducerIdExpirationMs = 60 * 60 * 1000 + val topicPartition = Log.parseTopicPartitionName(logDir) + val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs) + + // Intercept all segment read calls + new Log(logDir, logConfig, logStartOffset = 0, recoveryPoint = recoveryPoint, mockTime.scheduler, + brokerTopicStats, mockTime, maxProducerIdExpirationMs, LogManager.ProducerIdExpirationCheckIntervalMs, + topicPartition, producerStateManager, new LogDirFailureChannel(10)) { + + override def addSegment(segment: LogSegment): LogSegment = { + val wrapper = new LogSegment(segment.log, segment.index, segment.timeIndex, segment.txnIndex, segment.baseOffset, + segment.indexIntervalBytes, segment.rollJitterMs, mockTime) { + + override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long, + minOneMessage: Boolean): FetchDataInfo = { + new Exception().printStackTrace() + segmentsWithReads += this + super.read(startOffset, maxOffset, maxSize, maxPosition, minOneMessage) + } + + override def recover(producerStateManager: ProducerStateManager, + leaderEpochCache: Option[LeaderEpochCache]): Int = { + recoveredSegments += this + super.recover(producerStateManager, leaderEpochCache) + } + } + super.addSegment(wrapper) + } + } } + + // Retain snapshots for the last 2 segments + ProducerStateManager.deleteSnapshotsBefore(logDir, segmentOffsets(segmentOffsets.size - 2)) + log = createLogWithInterceptedReads(offsetForRecoveryPointSegment) + // We will reload all segments because the recovery point is behind the producer snapshot files (pre KAFKA-5829 behaviour) + assertEquals(segOffsetsBeforeRecovery, segmentsWithReads.map(_.baseOffset) -- Seq(activeSegmentOffset)) + assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset)) + var expectedSnapshotOffsets = segmentOffsets.takeRight(4) :+ log.logEndOffset + assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) + log.close() + segmentsWithReads.clear() + recoveredSegments.clear() + + // Only delete snapshots before the base offset of the recovery point segment (post KAFKA-5829 behaviour) to + // avoid reading all segments + ProducerStateManager.deleteSnapshotsBefore(logDir, offsetForRecoveryPointSegment) + log = createLogWithInterceptedReads(recoveryPoint = recoveryPoint) + assertEquals(Seq(activeSegmentOffset), segmentsWithReads.map(_.baseOffset)) + assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset)) + expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset + assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) + + // Verify that we keep 2 snapshot files if we checkpoint the log end offset + log.deleteSnapshotsAfterRecoveryPointCheckpoint() + expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset + assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) + log.close() } @Test @@ -192,7 +296,7 @@ class LogTest { } @Test - def testPidMapOffsetUpdatedForNonIdempotentData() { + def testProducerIdMapOffsetUpdatedForNonIdempotentData() { val logConfig = createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes))) @@ -343,7 +447,7 @@ class LogTest { logDirFailureChannel = null) EasyMock.verify(stateManager) - cleanShutdownFile.delete() + Utils.delete(cleanShutdownFile) } @Test @@ -379,11 +483,11 @@ class LogTest { logDirFailureChannel = null) EasyMock.verify(stateManager) - cleanShutdownFile.delete() + Utils.delete(cleanShutdownFile) } @Test - def testRebuildPidMapWithCompactedData() { + def testRebuildProducerIdMapWithCompactedData() { val logConfig = createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) val pid = 1L @@ -467,7 +571,7 @@ class LogTest { } @Test - def testUpdatePidMapWithCompactedData() { + def testUpdateProducerIdMapWithCompactedData() { val logConfig = createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) val pid = 1L @@ -500,7 +604,7 @@ class LogTest { } @Test - def testPidMapTruncateTo() { + def testProducerIdMapTruncateTo() { val logConfig = createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes))), leaderEpoch = 0) @@ -520,7 +624,7 @@ class LogTest { } @Test - def testPidMapTruncateToWithNoSnapshots() { + def testProducerIdMapTruncateToWithNoSnapshots() { // This ensures that the upgrade optimization path cannot be hit after initial loading val logConfig = createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) @@ -532,10 +636,7 @@ class LogTest { log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 1), leaderEpoch = 0) - // Delete all snapshots prior to truncating - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file => - Files.delete(file.toPath) - } + deleteProducerSnapshotFiles() log.truncateTo(1L) assertEquals(1, log.activeProducersWithLastSequence.size) @@ -612,7 +713,7 @@ class LogTest { } @Test - def testPidMapTruncateFullyAndStartAt() { + def testProducerIdMapTruncateFullyAndStartAt() { val records = TestUtils.singletonRecords("foo".getBytes) val logConfig = createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2) val log = createLog(logDir, logConfig) @@ -634,7 +735,7 @@ class LogTest { } @Test - def testPidExpirationOnSegmentDeletion() { + def testProducerIdExpirationOnSegmentDeletion() { val pid1 = 1L val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), producerId = pid1, producerEpoch = 0, sequence = 0) val logConfig = createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2) @@ -660,7 +761,7 @@ class LogTest { } @Test - def testTakeSnapshotOnRollAndDeleteSnapshotOnFlush() { + def testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint() { val logConfig = createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0) @@ -677,9 +778,11 @@ class LogTest { log.roll(3L) assertEquals(Some(3L), log.latestProducerSnapshotOffset) - // roll triggers a flush at the starting offset of the new segment. we should - // retain the snapshots from the active segment and the previous segment, but - // the oldest one should be gone + // roll triggers a flush at the starting offset of the new segment, we should retain all snapshots + assertEquals(Some(1L), log.oldestProducerSnapshotOffset) + + // retain the snapshots from the active segment and the previous segment, delete the oldest one + log.deleteSnapshotsAfterRecoveryPointCheckpoint() assertEquals(Some(2L), log.oldestProducerSnapshotOffset) // even if we flush within the active segment, the snapshot should remain @@ -729,7 +832,7 @@ class LogTest { } @Test - def testPeriodicPidExpiration() { + def testPeriodicProducerIdExpiration() { val maxProducerIdExpirationMs = 200 val producerIdExpirationCheckIntervalMs = 100 @@ -824,7 +927,7 @@ class LogTest { } @Test - def testMultiplePidsPerMemoryRecord() : Unit = { + def testMultipleProducerIdsPerMemoryRecord() : Unit = { // create a log val log = createLog(logDir, LogConfig()) @@ -1199,7 +1302,7 @@ class LogTest { maxOffset = Some(numMessages + 1)).records assertEquals("Should be no more messages", 0, lastRead.records.asScala.size) - // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure + // check that rolling the log forced a flushed, the flush is async so retry in case of failure TestUtils.retry(1000L){ assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset) } @@ -1375,7 +1478,8 @@ class LogTest { } log.close() - def verifyRecoveredLog(log: Log) { + def verifyRecoveredLog(log: Log, expectedRecoveryPoint: Long) { + assertEquals(s"Unexpected recovery point", expectedRecoveryPoint, log.recoveryPoint) assertEquals(s"Should have $numMessages messages when log is reopened w/o recovery", numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) @@ -1385,12 +1489,12 @@ class LogTest { } log = createLog(logDir, logConfig, recoveryPoint = lastOffset) - verifyRecoveredLog(log) + verifyRecoveredLog(log, lastOffset) log.close() // test recovery case log = createLog(logDir, logConfig) - verifyRecoveredLog(log) + verifyRecoveredLog(log, lastOffset) log.close() } @@ -1827,7 +1931,7 @@ class LogTest { recoveryPoint = log.logEndOffset log = createLog(logDir, logConfig) assertEquals(recoveryPoint, log.logEndOffset) - cleanShutdownFile.delete() + Utils.delete(cleanShutdownFile) } @Test @@ -2584,10 +2688,7 @@ class LogTest { appendPid4(4) // 89 appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90 - // delete all snapshot files - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file => - Files.delete(file.toPath) - } + deleteProducerSnapshotFiles() // delete the last offset and transaction index files to force recovery. this should force us to rebuild // the producer state from the start of the log @@ -2930,4 +3031,13 @@ class LogTest { assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists()) cleanShutdownFile } + + private def deleteProducerSnapshotFiles(): Unit = { + val files = logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerSnapshotFileSuffix)) + files.foreach(Utils.delete) + } + + private def listProducerSnapshotOffsets: Seq[Long] = + ProducerStateManager.listSnapshotFiles(logDir).map(Log.offsetFromFile).sorted + } http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 8650624..67b1b15 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -741,6 +741,6 @@ class ProducerStateManagerTest extends JUnitSuite { } private def currentSnapshotOffsets = - logDir.listFiles().map(file => Log.offsetFromFilename(file.getName)).toSet + logDir.listFiles.map(Log.offsetFromFile).toSet } http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index fc49d8c..b95f66c 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -17,6 +17,7 @@ package kafka.server +import AbstractFetcherThread._ import com.yammer.metrics.Metrics import kafka.cluster.BrokerEndPoint import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData} http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 79acfa7..1a75f94 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -37,7 +37,7 @@ class HighwatermarkPersistenceTest { val zkUtils = EasyMock.createMock(classOf[ZkUtils]) val logManagers = configs map { config => TestUtils.createLogManager( - logDirs = config.logDirs.map(new File(_)).toArray, + logDirs = config.logDirs.map(new File(_)), cleanerConfig = CleanerConfig()) } @@ -47,7 +47,7 @@ class HighwatermarkPersistenceTest { @After def teardown() { - for(manager <- logManagers; dir <- manager.liveLogDirs) + for (manager <- logManagers; dir <- manager.liveLogDirs) Utils.delete(dir) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index b8c78ff..57fe5b5 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -67,7 +67,7 @@ class ReplicaManagerTest { def testHighWaterMarkDirectoryMapping() { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val config = KafkaConfig.fromProps(props) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) @@ -86,7 +86,7 @@ class ReplicaManagerTest { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = KafkaConfig.fromProps(props) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) @@ -104,7 +104,7 @@ class ReplicaManagerTest { def testIllegalRequiredAcks() { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val config = KafkaConfig.fromProps(props) - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), Option(this.getClass.getName)) @@ -133,7 +133,7 @@ class ReplicaManagerTest { props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = KafkaConfig.fromProps(props) val logProps = new Properties() - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps)) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps)) val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1)) val metadataCache = EasyMock.createMock(classOf[MetadataCache]) EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() @@ -594,7 +594,7 @@ class ReplicaManagerTest { props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = KafkaConfig.fromProps(props) val logProps = new Properties() - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps)) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps)) val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId)) val metadataCache = EasyMock.createMock(classOf[MetadataCache]) EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 66845c1..23c40fe 100755 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -24,6 +24,7 @@ import kafka.api.FetchRequestBuilder import kafka.message.ByteBufferMessageSet import java.io.File +import kafka.log.LogManager import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer} import org.junit.{Before, Test} @@ -67,7 +68,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness { // do a clean shutdown and check that offset checkpoint file exists server.shutdown() for (logDir <- config.logDirs) { - val OffsetCheckpointFile = new File(logDir, server.logManager.RecoveryPointCheckpointFile) + val OffsetCheckpointFile = new File(logDir, LogManager.RecoveryPointCheckpointFile) assertTrue(OffsetCheckpointFile.exists) assertTrue(OffsetCheckpointFile.length() > 0) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d3a603cb/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 687307a..98683a5 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -836,8 +836,8 @@ object TestUtils extends Logging { return } catch { case e: AssertionError => - val ellapsed = System.currentTimeMillis - startTime - if(ellapsed > maxWaitMs) { + val elapsed = System.currentTimeMillis - startTime + if (elapsed > maxWaitMs) { throw e } else { info("Attempt failed, sleeping for " + wait + ", and then retrying.") @@ -1019,7 +1019,7 @@ object TestUtils extends Logging { /** * Create new LogManager instance with default configuration for testing */ - def createLogManager(logDirs: Array[File] = Array.empty[File], + def createLogManager(logDirs: Seq[File] = Seq.empty[File], defaultConfig: LogConfig = LogConfig(), cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false), time: MockTime = new MockTime()): LogManager = {
