This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new 07c5282 KAFKA-7215: Improve LogCleaner Error Handling (#5439) 07c5282 is described below commit 07c5282d25cb04cf1212c3daec5c7d8798f9efa1 Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com> AuthorDate: Mon Oct 8 20:54:37 2018 +0100 KAFKA-7215: Improve LogCleaner Error Handling (#5439) The thread no longer dies. When encountering an unexpected error, it marks the partition as "uncleanable" which means it will not try to clean its logs in subsequent runs. Reviewers: Dhruvil Shah <dhru...@confluent.io>, Jun Rao <jun...@gmail.com> --- core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/log/LogCleaner.scala | 107 ++++-- .../main/scala/kafka/log/LogCleanerManager.scala | 86 ++++- .../scala/kafka/server/LogDirFailureChannel.scala | 3 +- .../log/AbstractLogCleanerIntegrationTest.scala | 30 ++ .../unit/kafka/log/LogCleanerIntegrationTest.scala | 389 ++++----------------- .../kafka/log/LogCleanerLagIntegrationTest.scala | 12 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 193 ++++++++-- ...> LogCleanerParameterizedIntegrationTest.scala} | 35 +- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 7 +- core/src/test/scala/unit/kafka/log/LogUtils.scala | 41 +++ 11 files changed, 479 insertions(+), 426 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8915c14..094473a 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -193,7 +193,7 @@ class Log(@volatile var dir: File, /* A lock that guards all modifications to the log */ private val lock = new Object - // The memory mapped buffer for index files of this log will be closed for index files of this log will be closed with either delete() or closeHandlers() + // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers() // After memory mapped buffer is closed, no disk IO operation should be performed for this log @volatile private var isMemoryMappedBufferClosed = false diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index bf4f7e1..0416325 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.Time import scala.collection.JavaConverters._ import scala.collection.{Iterable, Set, mutable} +import scala.util.control.ControlThrowable /** * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy. @@ -293,49 +294,75 @@ class LogCleaner(initialConfig: CleanerConfig, /** * The main loop for the cleaner thread + * Clean a log if there is a dirty log available, otherwise sleep for a bit */ override def doWork() { - cleanOrSleep() + val cleaned = cleanFilthiestLog() + if (!cleaned) + pause(config.backOffMs, TimeUnit.MILLISECONDS) } /** - * Clean a log if there is a dirty log available, otherwise sleep for a bit - */ - private def cleanOrSleep() { - val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match { - case None => - false - case Some(cleanable) => - // there's a log, clean it - var endOffset = cleanable.firstDirtyOffset - try { - val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable) - recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats) - endOffset = nextDirtyOffset - } catch { - case _: LogCleaningAbortedException => // task can be aborted, let it go. - case _: KafkaStorageException => // partition is already offline. let it go. - case e: IOException => - val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException" - logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e) - } finally { - cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) + * Cleans a log if there is a dirty log available + * @return whether a log was cleaned + */ + private def cleanFilthiestLog(): Boolean = { + var currentLog: Option[Log] = None + + try { + val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match { + case None => + false + case Some(cleanable) => + // there's a log, clean it + currentLog = Some(cleanable.log) + cleanLog(cleanable) + true + } + val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs() + try { + deletable.foreach { + case (topicPartition, log) => + try { + currentLog = Some(log) + log.deleteOldSegments() + } } - true + } finally { + cleanerManager.doneDeleting(deletable.map(_._1)) + } + + cleaned + } catch { + case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e + case e: Exception => + if (currentLog.isEmpty) { + throw new IllegalStateException("currentLog cannot be empty on an unexpected exception", e) + } + val erroneousLog = currentLog.get + warn(s"Unexpected exception thrown when cleaning log $erroneousLog. Marking its partition (${erroneousLog.topicPartition}) as uncleanable", e) + cleanerManager.markPartitionUncleanable(erroneousLog.dir.getParent, erroneousLog.topicPartition) + + false } - val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs() + } + private def cleanLog(cleanable: LogToClean): Unit = { + var endOffset = cleanable.firstDirtyOffset try { - deletable.foreach { - case (_, log) => - log.deleteOldSegments() - } + val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable) + recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats) + endOffset = nextDirtyOffset + } catch { + case _: LogCleaningAbortedException => // task can be aborted, let it go. + case _: KafkaStorageException => // partition is already offline. let it go. + case e: IOException => + var logDirectory = cleanable.log.dir.getParent + val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException" + logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e) } finally { - cleanerManager.doneDeleting(deletable.map(_._1)) + cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) } - - if (!cleaned) - pause(config.backOffMs, TimeUnit.MILLISECONDS) } /** @@ -398,6 +425,18 @@ object LogCleaner { LogSegment.open(log.dir, baseOffset, log.config, Time.SYSTEM, fileAlreadyExists = false, fileSuffix = Log.CleanedFileSuffix, initFileSize = log.initFileSize, preallocate = log.config.preallocate) } + + /** + * Given the first dirty offset and an uncleanable offset, calculates the total cleanable bytes for this log + * @return the biggest uncleanable offset and the total amount of cleanable bytes + */ + def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = { + val firstUncleanableSegment = log.logSegments(uncleanableOffset, log.activeSegment.baseOffset).headOption.getOrElse(log.activeSegment) + val firstUncleanableOffset = firstUncleanableSegment.baseOffset + val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum + + (firstUncleanableOffset, cleanableBytes) + } } /** @@ -951,9 +990,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) { */ private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] { val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum - private[this] val firstUncleanableSegment = log.logSegments(uncleanableOffset, log.activeSegment.baseOffset).headOption.getOrElse(log.activeSegment) - val firstUncleanableOffset = firstUncleanableSegment.baseOffset - val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum + val (firstUncleanableOffset, cleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset) val totalBytes = cleanBytes + cleanableBytes val cleanableRatio = cleanableBytes / totalBytes.toDouble override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 7a96d8f..13d14c1 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -73,12 +73,55 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], /* the set of logs currently being cleaned */ private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]() + /* the set of uncleanable partitions (partitions that have raised an unexpected error during cleaning) + * for each log directory */ + private val uncleanablePartitions = mutable.HashMap[String, mutable.Set[TopicPartition]]() + + /* the set of directories marked as uncleanable and therefore offline */ + private val uncleanableDirs = mutable.HashSet[String]() + /* a global lock used to control all access to the in-progress set and the offset checkpoints */ private val lock = new ReentrantLock /* for coordinating the pausing and the cleaning of a partition */ private val pausedCleaningCond = lock.newCondition() + /* gauges for tracking the number of partitions marked as uncleanable for each log directory */ + for (dir <- logDirs) { + newGauge( + "uncleanable-partitions-count", + new Gauge[Int] { def value = inLock(lock) { uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) } }, + Map("logDirectory" -> dir.getAbsolutePath) + ) + } + + /* gauges for tracking the number of uncleanable bytes from uncleanable partitions for each log directory */ + for (dir <- logDirs) { + newGauge( + "uncleanable-bytes", + new Gauge[Long] { + def value = { + inLock(lock) { + uncleanablePartitions.get(dir.getAbsolutePath) match { + case Some(partitions) => { + val lastClean = allCleanerCheckpoints + val now = Time.SYSTEM.milliseconds + partitions.map { tp => + val log = logs.get(tp) + val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, tp, lastClean, now) + val (_, uncleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, firstUncleanableDirtyOffset) + uncleanableBytes + }.sum + } + case _ => 0 + } + } + } + }, + Map("logDirectory" -> dir.getAbsolutePath) + ) + } + /* a gauge for tracking the cleanable ratio of the dirtiest log */ @volatile private var dirtiestLogCleanableRatio = 0.0 newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt }) @@ -135,7 +178,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val dirtyLogs = logs.filter { case (_, log) => log.config.compact // match logs that are marked as compacted }.filterNot { - case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress + case (topicPartition, log) => + // skip any logs already in-progress and uncleanable partitions + inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition) }.map { case (topicPartition, log) => // create a LogToClean instance for each val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition, @@ -179,13 +224,15 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } /** - * Find any logs that have compaction enabled. Include logs without delete enabled, as they may have segments + * Find any logs that have compaction enabled. Mark them as being cleaned + * Include logs without delete enabled, as they may have segments * that precede the start offset. */ def deletableLogs(): Iterable[(TopicPartition, Log)] = { inLock(lock) { val toClean = logs.filter { case (topicPartition, log) => - !inProgress.contains(topicPartition) && log.config.compact + !inProgress.contains(topicPartition) && log.config.compact && + !isUncleanablePartition(log, topicPartition) } toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) } toClean @@ -332,6 +379,12 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], case e: KafkaStorageException => error(s"Failed to access checkpoint file in dir ${sourceLogDir.getAbsolutePath}", e) } + + val logUncleanablePartitions = uncleanablePartitions.getOrElse(sourceLogDir.toString, mutable.Set[TopicPartition]()) + if (logUncleanablePartitions.contains(topicPartition)) { + logUncleanablePartitions.remove(topicPartition) + markPartitionUncleanable(destLogDir.toString, topicPartition) + } } } @@ -393,6 +446,33 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } } } + + /** + * Returns an immutable set of the uncleanable partitions for a given log directory + * Only used for testing + */ + private[log] def uncleanablePartitions(logDir: String): Set[TopicPartition] = { + var partitions: Set[TopicPartition] = Set() + inLock(lock) { partitions ++= uncleanablePartitions.getOrElse(logDir, partitions) } + partitions + } + + def markPartitionUncleanable(logDir: String, partition: TopicPartition): Unit = { + inLock(lock) { + uncleanablePartitions.get(logDir) match { + case Some(partitions) => + partitions.add(partition) + case None => + uncleanablePartitions.put(logDir, mutable.Set(partition)) + } + } + } + + private def isUncleanablePartition(log: Log, topicPartition: TopicPartition): Boolean = { + inLock(lock) { + uncleanablePartitions.get(log.dir.getParent).exists(partitions => partitions.contains(topicPartition)) + } + } } private[log] object LogCleanerManager extends Logging { diff --git a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala index c78f04e..897d3fc 100644 --- a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala +++ b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala @@ -45,9 +45,8 @@ class LogDirFailureChannel(logDirNum: Int) extends Logging { */ def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = { error(msg, e) - if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) { + if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) offlineLogDirQueue.add(logDir) - } } /* diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index 0ad5b46..2a483fa 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -24,10 +24,13 @@ import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import kafka.utils.{MockTime, Pool, TestUtils} import kafka.utils.Implicits._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch} import org.apache.kafka.common.utils.Utils import org.junit.After +import scala.collection.Seq import scala.collection.mutable.ListBuffer +import scala.util.Random abstract class AbstractLogCleanerIntegrationTest { @@ -118,4 +121,31 @@ abstract class AbstractLogCleanerIntegrationTest { logDirFailureChannel = new LogDirFailureChannel(1), time = time) } + + def codec: CompressionType + private var ctr = 0 + def counter: Int = ctr + def incCounter(): Unit = ctr += 1 + + def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, + startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = { + for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield { + val value = counter.toString + val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec, + key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0) + incCounter() + (key, value, appendInfo.firstOffset.get) + } + } + + def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, MemoryRecords) = { + def messageValue(length: Int): String = { + val random = new Random(0) + new String(random.alphanumeric.take(length).toArray) + } + val value = messageValue(128) + val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec = codec, key = key.toString.getBytes, + magicValue = messageFormatVersion) + (value, messageSet) + } } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala old mode 100755 new mode 100644 index 64e8b38..bfee811 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -1,349 +1,96 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package kafka.log -import java.io.File -import java.util.Properties +import java.io.PrintWriter -import kafka.api.KAFKA_0_11_0_IV0 -import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0} -import kafka.server.KafkaConfig -import kafka.server.checkpoints.OffsetCheckpointFile -import kafka.utils._ +import com.yammer.metrics.Metrics +import com.yammer.metrics.core.Gauge +import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.record._ -import org.junit.Assert._ -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.junit.runners.Parameterized.Parameters +import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS +import org.apache.kafka.common.record.{CompressionType, RecordBatch} +import org.junit.Assert.{assertFalse, assertTrue, fail} +import org.junit.Test -import scala.Seq -import scala.collection._ -import scala.util.Random +import scala.collection.JavaConverters.mapAsScalaMapConverter /** - * This is an integration test that tests the fully integrated log cleaner - */ -@RunWith(value = classOf[Parameterized]) -class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCleanerIntegrationTest { + * This is an integration test that tests the fully integrated log cleaner + */ +class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest { + + val codec: CompressionType = CompressionType.LZ4 - val codec = CompressionType.forName(compressionCodec) val time = new MockTime() - var counter = 0 val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2)) - @Test - def cleanerTest() { + @Test(timeout = DEFAULT_MAX_WAIT_MS) + def testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics() { val largeMessageKey = 20 - val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE) + val (_, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE) val maxMessageSize = largeMessageSet.sizeInBytes + cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize, backOffMs = 100) - cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize) - val log = cleaner.logs.get(topicPartitions(0)) - - val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec) - val startSize = log.size - cleaner.startup() - - val firstDirty = log.activeSegment.baseOffset - checkLastCleaned("log", 0, firstDirty) - val compactedSize = log.logSegments.map(_.size).sum - assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize) - - checkLogAfterAppendingDups(log, startSize, appends) - - val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0) - val largeMessageOffset = appendInfo.firstOffset.get - - val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log = log, codec = codec) - val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dups - val firstDirty2 = log.activeSegment.baseOffset - checkLastCleaned("log", 0, firstDirty2) - - checkLogAfterAppendingDups(log, startSize, appends2) - - // simulate deleting a partition, by removing it from logs - // force a checkpoint - // and make sure its gone from checkpoint file - cleaner.logs.remove(topicPartitions(0)) - cleaner.updateCheckpoints(logDir) - val checkpoints = new OffsetCheckpointFile(new File(logDir, cleaner.cleanerManager.offsetCheckpointFile)).read() - // we expect partition 0 to be gone - assertFalse(checkpoints.contains(topicPartitions(0))) - } - - @Test - def testCleansCombinedCompactAndDeleteTopic(): Unit = { - val logProps = new Properties() - val retentionMs: Integer = 100000 - logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer) - logProps.put(LogConfig.CleanupPolicyProp, "compact,delete") - - def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String, Long)]) = { - cleaner = makeCleaner(partitions = topicPartitions.take(1), propertyOverrides = logProps, backOffMs = 100L) - val log = cleaner.logs.get(topicPartitions(0)) - - val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec) - val startSize = log.size - - log.onHighWatermarkIncremented(log.logEndOffset) - - val firstDirty = log.activeSegment.baseOffset - cleaner.startup() - - // should compact the log - checkLastCleaned("log", 0, firstDirty) - val compactedSize = log.logSegments.map(_.size).sum - assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize) - (log, messages) - } - - val (log, _) = runCleanerAndCheckCompacted(100) - // should delete old segments - log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs)) - - TestUtils.waitUntilTrue(() => log.numberOfSegments == 1, "There should only be 1 segment remaining", 10000L) - assertEquals(1, log.numberOfSegments) + def breakPartitionLog(tp: TopicPartition): Unit = { + val log = cleaner.logs.get(tp) + writeDups(numKeys = 20, numDups = 3, log = log, codec = codec) - cleaner.shutdown() + val partitionFile = log.logSegments.last.log.file() + val writer = new PrintWriter(partitionFile) + writer.write("jogeajgoea") + writer.close() - // run the cleaner again to make sure if there are no issues post deletion - val (log2, messages) = runCleanerAndCheckCompacted(20) - val read = readFromLog(log2) - assertEquals("Contents of the map shouldn't change", toMap(messages), toMap(read)) - } - - // returns (value, ByteBufferMessageSet) - private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, MemoryRecords) = { - def messageValue(length: Int): String = { - val random = new Random(0) - new String(random.alphanumeric.take(length).toArray) + writeDups(numKeys = 20, numDups = 3, log = log, codec = codec) } - val value = messageValue(128) - val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec = codec, key = key.toString.getBytes, - magicValue = messageFormatVersion) - (value, messageSet) - } - @Test - def testCleanerWithMessageFormatV0(): Unit = { - val largeMessageKey = 20 - val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0) - val maxMessageSize = codec match { - case CompressionType.NONE => largeMessageSet.sizeInBytes - case _ => - // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to - // increase because the broker offsets are larger than the ones assigned by the client - // adding `5` to the message set size is good enough for this test: it covers the increased message size while - // still being less than the overhead introduced by the conversion from message format version 0 to 1 - largeMessageSet.sizeInBytes + 5 + def getGauge[T](metricName: String, metricScope: String): Gauge[T] = { + Metrics.defaultRegistry.allMetrics.asScala + .filterKeys(k => { + k.getName.endsWith(metricName) && k.getScope.endsWith(metricScope) + }) + .headOption + .getOrElse { fail(s"Unable to find metric $metricName") } + .asInstanceOf[(Any, Gauge[Any])] + ._2 + .asInstanceOf[Gauge[T]] } - cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize) - - val log = cleaner.logs.get(topicPartitions(0)) - val props = logConfigProperties(maxMessageSize = maxMessageSize) - props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version) - log.config = new LogConfig(props) + breakPartitionLog(topicPartitions(0)) + breakPartitionLog(topicPartitions(1)) - val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0) - val startSize = log.size cleaner.startup() - val firstDirty = log.activeSegment.baseOffset - checkLastCleaned("log", 0, firstDirty) - val compactedSize = log.logSegments.map(_.size).sum - assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize) - - checkLogAfterAppendingDups(log, startSize, appends) - - val appends2: Seq[(Int, String, Long)] = { - val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0) - val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0) - val largeMessageOffset = appendInfo.firstOffset.get - - // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly - props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_11_0_IV0.version) - log.config = new LogConfig(props) - val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1) - val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V2) - appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1 ++ dupsV2 - } - val firstDirty2 = log.activeSegment.baseOffset - checkLastCleaned("log", 0, firstDirty2) - - checkLogAfterAppendingDups(log, startSize, appends2) - } - - @Test - def testCleaningNestedMessagesWithMultipleVersions(): Unit = { - val maxMessageSize = 192 - cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize) - val log = cleaner.logs.get(topicPartitions(0)) - val props = logConfigProperties(maxMessageSize = maxMessageSize) - props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version) - log.config = new LogConfig(props) - - // with compression enabled, these messages will be written as a single message containing - // all of the individual messages - var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0) - appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0) - - props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version) - log.config = new LogConfig(props) - - var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1) - appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1) - appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1) - - val appends = appendsV0 ++ appendsV1 - - val startSize = log.size - cleaner.startup() - - val firstDirty = log.activeSegment.baseOffset - assertTrue(firstDirty > appendsV0.size) // ensure we clean data from V0 and V1 - - checkLastCleaned("log", 0, firstDirty) - val compactedSize = log.logSegments.map(_.size).sum - assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize) - - checkLogAfterAppendingDups(log, startSize, appends) - } - - @Test - def cleanerConfigUpdateTest() { - val largeMessageKey = 20 - val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE) - val maxMessageSize = largeMessageSet.sizeInBytes - - cleaner = makeCleaner(partitions = topicPartitions, backOffMs = 1, maxMessageSize = maxMessageSize, - cleanerIoBufferSize = Some(1)) - val log = cleaner.logs.get(topicPartitions(0)) - - writeDups(numKeys = 100, numDups = 3, log = log, codec = codec) - val startSize = log.size - cleaner.startup() - assertEquals(1, cleaner.cleanerCount) - - // Verify no cleaning with LogCleanerIoBufferSizeProp=1 - val firstDirty = log.activeSegment.baseOffset - val topicPartition = new TopicPartition("log", 0) - cleaner.awaitCleaned(topicPartition, firstDirty, maxWaitMs = 10) - assertTrue("Should not have cleaned", cleaner.cleanerManager.allCleanerCheckpoints.isEmpty) - - def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig): KafkaConfig = { - val props = TestUtils.createBrokerConfig(0, "localhost:2181") - props.put(KafkaConfig.LogCleanerThreadsProp, cleanerConfig.numThreads.toString) - props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, cleanerConfig.dedupeBufferSize.toString) - props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, cleanerConfig.dedupeBufferLoadFactor.toString) - props.put(KafkaConfig.LogCleanerIoBufferSizeProp, cleanerConfig.ioBufferSize.toString) - props.put(KafkaConfig.MessageMaxBytesProp, cleanerConfig.maxMessageSize.toString) - props.put(KafkaConfig.LogCleanerBackoffMsProp, cleanerConfig.backOffMs.toString) - props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, cleanerConfig.maxIoBytesPerSecond.toString) - KafkaConfig.fromProps(props) - } - - // Verify cleaning done with larger LogCleanerIoBufferSizeProp - val oldConfig = kafkaConfigWithCleanerConfig(cleaner.currentConfig) - val newConfig = kafkaConfigWithCleanerConfig(CleanerConfig(numThreads = 2, - dedupeBufferSize = cleaner.currentConfig.dedupeBufferSize, - dedupeBufferLoadFactor = cleaner.currentConfig.dedupeBufferLoadFactor, - ioBufferSize = 100000, - maxMessageSize = cleaner.currentConfig.maxMessageSize, - maxIoBytesPerSecond = cleaner.currentConfig.maxIoBytesPerSecond, - backOffMs = cleaner.currentConfig.backOffMs)) - cleaner.reconfigure(oldConfig, newConfig) - - assertEquals(2, cleaner.cleanerCount) - checkLastCleaned("log", 0, firstDirty) - val compactedSize = log.logSegments.map(_.size).sum - assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize) - } - - private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: Long) { - // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than - // LogConfig.MinCleanableDirtyRatioProp - val topicPartition = new TopicPartition(topic, partitionId) - cleaner.awaitCleaned(topicPartition, firstDirty) - val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(topicPartition) - assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned", - lastCleaned >= firstDirty) - } - - private def checkLogAfterAppendingDups(log: Log, startSize: Long, appends: Seq[(Int, String, Long)]) { - val read = readFromLog(log) - assertEquals("Contents of the map shouldn't change", toMap(appends), toMap(read)) - assertTrue(startSize > log.size) - } - - private def toMap(messages: Iterable[(Int, String, Long)]): Map[Int, (String, Long)] = { - messages.map { case (key, value, offset) => key -> (value, offset) }.toMap - } - - private def readFromLog(log: Log): Iterable[(Int, String, Long)] = { - import JavaConverters._ - for (segment <- log.logSegments; deepLogEntry <- segment.log.records.asScala) yield { - val key = TestUtils.readString(deepLogEntry.key).toInt - val value = TestUtils.readString(deepLogEntry.value) - (key, value, deepLogEntry.offset) - } - } - - private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, - startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = { - for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield { - val value = counter.toString - val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec, - key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0) - counter += 1 - (key, value, appendInfo.firstOffset.get) - } - } - - private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, - startKey: Int = 0, magicValue: Byte): Seq[(Int, String, Long)] = { - val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield { - val payload = counter.toString - counter += 1 - (key, payload) - } - - val records = kvs.map { case (key, payload) => - new SimpleRecord(key.toString.getBytes, payload.toString.getBytes) - } - - val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, codec, records: _*), leaderEpoch = 0) - val offsets = appendInfo.firstOffset.get to appendInfo.lastOffset - - kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) } - } - -} - -object LogCleanerIntegrationTest { - @Parameters - def parameters: java.util.Collection[Array[String]] = { - val list = new java.util.ArrayList[Array[String]]() - for (codec <- CompressionType.values) - list.add(Array(codec.name)) - list + val log2 = cleaner.logs.get(topicPartitions(1)) + val uncleanableDirectory = log.dir.getParent + val uncleanablePartitionsCountGauge = getGauge[Int]("uncleanable-partitions-count", uncleanableDirectory) + val uncleanableBytesGauge = getGauge[Long]("uncleanable-bytes", uncleanableDirectory) + + TestUtils.waitUntilTrue(() => uncleanablePartitionsCountGauge.value() == 2, "There should be 2 uncleanable partitions", 2000L) + val expectedTotalUncleanableBytes = LogCleaner.calculateCleanableBytes(log, 0, log.logSegments.last.baseOffset)._2 + + LogCleaner.calculateCleanableBytes(log2, 0, log2.logSegments.last.baseOffset)._2 + TestUtils.waitUntilTrue(() => uncleanableBytesGauge.value() == expectedTotalUncleanableBytes, + s"There should be $expectedTotalUncleanableBytes uncleanable bytes", 1000L) + + val uncleanablePartitions = cleaner.cleanerManager.uncleanablePartitions(uncleanableDirectory) + assertTrue(uncleanablePartitions.contains(topicPartitions(0))) + assertTrue(uncleanablePartitions.contains(topicPartitions(1))) + assertFalse(uncleanablePartitions.contains(topicPartitions(2))) } } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala index bf634d7..6e8c9b9 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala @@ -41,9 +41,10 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs` val cleanerBackOffMs = 200L val segmentSize = 512 - var counter = 0 + + override def codec: CompressionType = CompressionType.forName(compressionCodecName) + val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2)) - val compressionCodec = CompressionType.forName(compressionCodecName) @Test def cleanerTest(): Unit = { @@ -55,7 +56,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac // t = T0 val T0 = time.milliseconds - val appends0 = writeDups(numKeys = 100, numDups = 3, log, compressionCodec, timestamp = T0) + val appends0 = writeDups(numKeys = 100, numDups = 3, log, codec, timestamp = T0) val startSizeBlock0 = log.size debug(s"total log size at T0: $startSizeBlock0") @@ -78,7 +79,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac val T1 = time.milliseconds // write another block of data - val appends1 = appends0 ++ writeDups(numKeys = 100, numDups = 3, log, compressionCodec, timestamp = T1) + val appends1 = appends0 ++ writeDups(numKeys = 100, numDups = 3, log, codec, timestamp = T1) val firstBlock1SegmentBaseOffset = activeSegAtT0.baseOffset // the first block should get cleaned @@ -111,11 +112,10 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac val count = counter log.appendAsLeader(TestUtils.singletonRecords(value = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0) - counter += 1 + incCounter() (key, count) } } - } object LogCleanerLagIntegrationTest { diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 2a48690..8ca26a8 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -29,6 +29,8 @@ import org.junit.Assert._ import org.junit.{After, Test} import org.scalatest.junit.JUnitSuite +import scala.collection.mutable + /** * Unit tests for the log cleaning logic */ @@ -36,6 +38,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val tmpDir = TestUtils.tempDir() val logDir = TestUtils.randomPartitionLogDir(tmpDir) + val topicPartition = new TopicPartition("log", 0) val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer) @@ -43,11 +46,124 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val logConfig = LogConfig(logProps) val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs` + val cleanerCheckpoints: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]() + + class LogCleanerManagerMock(logDirs: Seq[File], + logs: Pool[TopicPartition, Log], + logDirFailureChannel: LogDirFailureChannel) extends LogCleanerManager(logDirs, logs, logDirFailureChannel) { + override def allCleanerCheckpoints: Map[TopicPartition, Long] = { + cleanerCheckpoints.toMap + } + } + @After def tearDown(): Unit = { Utils.delete(tmpDir) } + @Test + def testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes) + val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1) + val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2) + val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3) + + val logs = new Pool[TopicPartition, Log]() + val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0 + logs.put(tp1, log1) + val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10 + logs.put(tp2, log2) + val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20 + logs.put(tp3, log3) + val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock] + cleanerCheckpoints.put(tp1, 0) // all clean + cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages + cleanerCheckpoints.put(tp3, 15) // 5 dirty messages + + val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get + + assertEquals(log2, filthiestLog.log) + assertEquals(tp2, filthiestLog.topicPartition) + } + + @Test + def testGrabFilthiestCompactedLogIgnoresUncleanablePartitions(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes) + val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1) + val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2) + val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3) + + val logs = new Pool[TopicPartition, Log]() + val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0 + logs.put(tp1, log1) + val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10 + logs.put(tp2, log2) + val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20 + logs.put(tp3, log3) + val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock] + cleanerCheckpoints.put(tp1, 0) // all clean + cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages + cleanerCheckpoints.put(tp3, 15) // 5 dirty messages + cleanerManager.markPartitionUncleanable(log2.dir.getParent, tp2) + + val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get + + assertEquals(log3, filthiestLog.log) + assertEquals(tp3, filthiestLog.topicPartition) + } + + @Test + def testGrabFilthiestCompactedLogIgnoresInProgressPartitions(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes) + val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1) + val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2) + val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3) + + val logs = new Pool[TopicPartition, Log]() + val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0 + logs.put(tp1, log1) + val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10 + logs.put(tp2, log2) + val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20 + logs.put(tp3, log3) + val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock] + cleanerCheckpoints.put(tp1, 0) // all clean + cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages + cleanerCheckpoints.put(tp3, 15) // 5 dirty messages + cleanerManager.setCleaningState(tp2, LogCleaningInProgress) + + val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get + + assertEquals(log3, filthiestLog.log) + assertEquals(tp3, filthiestLog.topicPartition) + } + + @Test + def testGrabFilthiestCompactedLogIgnoresBothInProgressPartitionsAndUncleanablePartitions(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes) + val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1) + val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2) + val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3) + + val logs = new Pool[TopicPartition, Log]() + val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0 + logs.put(tp1, log1) + val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10 + logs.put(tp2, log2) + val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20 + logs.put(tp3, log3) + val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock] + cleanerCheckpoints.put(tp1, 0) // all clean + cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages + cleanerCheckpoints.put(tp3, 15) // 5 dirty messages + cleanerManager.setCleaningState(tp2, LogCleaningInProgress) + cleanerManager.markPartitionUncleanable(log3.dir.getParent, tp3) + + val filthiestLog: Option[LogToClean] = cleanerManager.grabFilthiestCompactedLog(time) + + assertTrue(filthiestLog.isEmpty) + } + /** * When checking for logs with segments ready for deletion * we shouldn't consider logs where cleanup.policy=delete @@ -166,7 +282,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { */ @Test def testConcurrentLogCleanupAndTopicDeletion(): Unit = { - val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val records = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes) val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete) val cleanerManager: LogCleanerManager = createCleanerManager(log) @@ -181,6 +297,21 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { } /** + * When looking for logs with segments ready to be deleted we shouldn't consider + * logs that have had their partition marked as uncleanable. + */ + @Test + def testLogsWithSegmentsToDeleteShouldNotConsiderUncleanablePartitions(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact) + val cleanerManager: LogCleanerManager = createCleanerManager(log) + cleanerManager.markPartitionUncleanable(log.dir.getParent, topicPartition) + + val readyToDelete = cleanerManager.deletableLogs().size + assertEquals("should have 0 logs ready to be deleted", 0, readyToDelete) + } + + /** * Test computation of cleanable range with no minimum compaction lag settings active */ @Test @@ -193,7 +324,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { while(log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0) - val topicPartition = new TopicPartition("log", 0) val lastClean = Map(topicPartition -> 0L) val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds) assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1) @@ -224,7 +354,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { while (log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t1), leaderEpoch = 0) - val topicPartition = new TopicPartition("log", 0) val lastClean = Map(topicPartition -> 0L) val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds) assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1) @@ -250,7 +379,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { time.sleep(compactionLag + 1) - val topicPartition = new TopicPartition("log", 0) val lastClean = Map(topicPartition -> 0L) val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds) assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1) @@ -259,7 +387,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { @Test def testUndecidedTransactionalDataNotCleanable(): Unit = { - val topicPartition = new TopicPartition("log", 0) val compactionLag = 60 * 60 * 1000 val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) @@ -315,21 +442,20 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val cleanerManager: LogCleanerManager = createCleanerManager(log) - val tp = new TopicPartition("log", 0) - intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1)) + intercept[IllegalStateException](cleanerManager.doneCleaning(topicPartition, log.dir, 1)) - cleanerManager.setCleaningState(tp, LogCleaningPaused(1)) - intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1)) + cleanerManager.setCleaningState(topicPartition, LogCleaningPaused(1)) + intercept[IllegalStateException](cleanerManager.doneCleaning(topicPartition, log.dir, 1)) - cleanerManager.setCleaningState(tp, LogCleaningInProgress) - cleanerManager.doneCleaning(tp, log.dir, 1) - assertTrue(cleanerManager.cleaningState(tp).isEmpty) - assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty) + cleanerManager.setCleaningState(topicPartition, LogCleaningInProgress) + cleanerManager.doneCleaning(topicPartition, log.dir, 1) + assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty) + assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty) - cleanerManager.setCleaningState(tp, LogCleaningAborted) - cleanerManager.doneCleaning(tp, log.dir, 1) - assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get) - assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty) + cleanerManager.setCleaningState(topicPartition, LogCleaningAborted) + cleanerManager.doneCleaning(topicPartition, log.dir, 1) + assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(topicPartition).get) + assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty) } @Test @@ -337,7 +463,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete) val cleanerManager: LogCleanerManager = createCleanerManager(log) - val tp = new TopicPartition("log", 0) intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp))) @@ -352,21 +477,27 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { cleanerManager.setCleaningState(tp, LogCleaningAborted) cleanerManager.doneDeleting(Seq(tp)) assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get) - } private def createCleanerManager(log: Log): LogCleanerManager = { val logs = new Pool[TopicPartition, Log]() - logs.put(new TopicPartition("log", 0), log) - val cleanerManager = new LogCleanerManager(Array(logDir), logs, null) - cleanerManager + logs.put(topicPartition, log) + createCleanerManager(logs) + } + + private def createCleanerManager(pool: Pool[TopicPartition, Log], toMock: Boolean = false): LogCleanerManager = { + if (toMock) + new LogCleanerManagerMock(Array(logDir), pool, null) + else + new LogCleanerManager(Array(logDir), pool, null) } - private def createLog(segmentSize: Int, cleanupPolicy: String): Log = { + private def createLog(segmentSize: Int, cleanupPolicy: String, segmentsCount: Int = 0): Log = { val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer) logProps.put(LogConfig.RetentionMsProp, 1: Integer) logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy) + logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.05: java.lang.Double) // small for easier and clearer tests val config = LogConfig(logProps) val partitionDir = new File(logDir, "log-0") @@ -380,6 +511,22 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { maxProducerIdExpirationMs = 60 * 60 * 1000, producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, logDirFailureChannel = new LogDirFailureChannel(10)) + for (i <- 0 until segmentsCount) { + val startOffset = i * 10 + val endOffset = startOffset + 10 + val segment = LogUtils.createSegment(startOffset, logDir) + var lastTimestamp = 0L + val records = (startOffset until endOffset).map { offset => + val currentTimestamp = time.milliseconds() + if (offset == endOffset - 1) + lastTimestamp = currentTimestamp + + new SimpleRecord(currentTimestamp, s"key-$offset".getBytes, s"value-$offset".getBytes) + } + + segment.append(endOffset, lastTimestamp, endOffset, MemoryRecords.withRecords(CompressionType.NONE, records:_*)) + log.addSegment(segment) + } log } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala similarity index 91% copy from core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala copy to core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala index 64e8b38..266bb39 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala @@ -35,19 +35,19 @@ import org.junit.runners.Parameterized.Parameters import scala.Seq import scala.collection._ -import scala.util.Random /** * This is an integration test that tests the fully integrated log cleaner */ @RunWith(value = classOf[Parameterized]) -class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCleanerIntegrationTest { +class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends AbstractLogCleanerIntegrationTest { - val codec = CompressionType.forName(compressionCodec) + val codec: CompressionType = CompressionType.forName(compressionCodec) val time = new MockTime() - var counter = 0 + val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2)) + @Test def cleanerTest() { val largeMessageKey = 20 @@ -129,18 +129,6 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle assertEquals("Contents of the map shouldn't change", toMap(messages), toMap(read)) } - // returns (value, ByteBufferMessageSet) - private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, MemoryRecords) = { - def messageValue(length: Int): String = { - val random = new Random(0) - new String(random.alphanumeric.take(length).toArray) - } - val value = messageValue(128) - val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec = codec, key = key.toString.getBytes, - magicValue = messageFormatVersion) - (value, messageSet) - } - @Test def testCleanerWithMessageFormatV0(): Unit = { val largeMessageKey = 20 @@ -307,22 +295,11 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle } } - private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, - startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = { - for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield { - val value = counter.toString - val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec, - key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0) - counter += 1 - (key, value, appendInfo.firstOffset.get) - } - } - private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, startKey: Int = 0, magicValue: Byte): Seq[(Int, String, Long)] = { val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield { val payload = counter.toString - counter += 1 + incCounter() (key, payload) } @@ -338,7 +315,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle } -object LogCleanerIntegrationTest { +object LogCleanerParameterizedIntegrationTest { @Parameters def parameters: java.util.Collection[Array[String]] = { val list = new java.util.ArrayList[Array[String]]() diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 8976c68..40b6874 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -40,12 +40,7 @@ class LogSegmentTest { indexIntervalBytes: Int = 10, maxSegmentMs: Int = Int.MaxValue, time: Time = Time.SYSTEM): LogSegment = { - val ms = FileRecords.open(Log.logFile(logDir, offset)) - val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000) - val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500) - val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset)) - val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = maxSegmentMs, - maxSegmentBytes = Int.MaxValue, time) + val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, maxSegmentMs, time) segments += seg seg } diff --git a/core/src/test/scala/unit/kafka/log/LogUtils.scala b/core/src/test/scala/unit/kafka/log/LogUtils.scala new file mode 100644 index 0000000..eb21895 --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/LogUtils.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io.File + +import org.apache.kafka.common.record.FileRecords +import org.apache.kafka.common.utils.Time + +object LogUtils { + /** + * Create a segment with the given base offset + */ + def createSegment(offset: Long, + logDir: File, + indexIntervalBytes: Int = 10, + maxSegmentMs: Int = Int.MaxValue, + time: Time = Time.SYSTEM): LogSegment = { + val ms = FileRecords.open(Log.logFile(logDir, offset)) + val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000) + val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500) + val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset)) + + new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs, Int.MaxValue, time) + } +}