This is an automated email from the ASF dual-hosted git repository. gwenshap pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 36b2733 KAFKA-10282; Remove Log metrics immediately when deleting log 36b2733 is described below commit 36b273370dd840cc2bb6307f311f8952b886b323 Author: Bob Barrett <bob.barr...@confluent.io> AuthorDate: Thu Jul 30 19:11:56 2020 -0700 KAFKA-10282; Remove Log metrics immediately when deleting log Currently, we remove the Log metrics when asynchronous deletion of the log is triggered. However, we attempt to register the metrics immediately upon log creation. If a Log object is re-created for a partition that is pending deletion (because a topic was quickly re-created or because a partition was moved off and back onto a broker), the registration of the new metrics can happen before the asyncrhonous deletion. In this case, the metrics are removed after the second registration, le [...] To fix this, this patch changes the log deletion behavior to remove the metrics when the log is first marked for deletion, rather than when the files are deleted. This removes the window in which metrics registration can occur before metrics removal. This is justifiable because the log should be logically deleted when a delete request or partition movement finishes, rather than when the files are actually removed. Tested with unit tests. Author: Bob Barrett <bob.barr...@confluent.io> Reviewers: David Jacot, Dhruvil Shah, Vikas Singh, Gwen Shapira Closes #9054 from bob-barrett/KAFKA-10282 --- core/src/main/scala/kafka/log/Log.scala | 1 - core/src/main/scala/kafka/log/LogManager.scala | 92 ++++++++++++---------- .../test/scala/unit/kafka/log/LogManagerTest.scala | 77 +++++++++++++++++- core/src/test/scala/unit/kafka/log/LogTest.scala | 21 ----- 4 files changed, 126 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index a58998e..7dccd5f 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2051,7 +2051,6 @@ class Log(@volatile private var _dir: File, maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") { lock synchronized { checkIfMemoryMappedBufferClosed() - removeLogMetrics() producerExpireCheck.cancel(true) removeAndDeleteSegments(logSegments, asyncDelete = false) leaderEpochCache.foreach(_.clear()) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index e2966b0..80cd279 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -199,27 +199,22 @@ class LogManager(logDirs: Seq[File], if (cleaner != null) cleaner.handleLogDirFailure(dir) - val offlineCurrentTopicPartitions = currentLogs.collect { - case (tp, log) if log.parentDir == dir => tp - } - offlineCurrentTopicPartitions.foreach { topicPartition => { - val removedLog = currentLogs.remove(topicPartition) - if (removedLog != null) { - removedLog.closeHandlers() - removedLog.removeLogMetrics() + def removeOfflineLogs(logs: Pool[TopicPartition, Log]): Iterable[TopicPartition] = { + val offlineTopicPartitions: Iterable[TopicPartition] = logs.collect { + case (tp, log) if log.parentDir == dir => tp } - }} + offlineTopicPartitions.foreach { topicPartition => { + val removedLog = removeLogAndMetrics(logs, topicPartition) + removedLog.foreach { + log => log.closeHandlers() + } + }} - val offlineFutureTopicPartitions = futureLogs.collect { - case (tp, log) if log.parentDir == dir => tp + offlineTopicPartitions } - offlineFutureTopicPartitions.foreach { topicPartition => { - val removedLog = futureLogs.remove(topicPartition) - if (removedLog != null) { - removedLog.closeHandlers() - removedLog.removeLogMetrics() - } - }} + + val offlineCurrentTopicPartitions = removeOfflineLogs(currentLogs) + val offlineFutureTopicPartitions = removeOfflineLogs(futureLogs) warn(s"Logs for partitions ${offlineCurrentTopicPartitions.mkString(",")} are offline and " + s"logs for future partitions ${offlineFutureTopicPartitions.mkString(",")} are offline due to failure on log directory $dir") @@ -932,6 +927,7 @@ class LogManager(logDirs: Seq[File], val logsToCheckpoint = logsInDir(logDir) checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty) checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) + sourceLog.removeLogMetrics() addLogToBeDeleted(sourceLog) } catch { case e: KafkaStorageException => @@ -957,32 +953,34 @@ class LogManager(logDirs: Seq[File], */ def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false, - checkpoint: Boolean = true): Log = { - val removedLog: Log = logCreationOrDeletionLock synchronized { - if (isFuture) - futureLogs.remove(topicPartition) - else - currentLogs.remove(topicPartition) + checkpoint: Boolean = true): Option[Log] = { + val removedLog: Option[Log] = logCreationOrDeletionLock synchronized { + removeLogAndMetrics(if (isFuture) futureLogs else currentLogs, topicPartition) } - if (removedLog != null) { - // We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. - if (cleaner != null && !isFuture) { - cleaner.abortCleaning(topicPartition) - if (checkpoint) - cleaner.updateCheckpoints(removedLog.parentDirFile) - } - removedLog.renameDir(Log.logDeleteDirName(topicPartition)) - if (checkpoint) { - val logDir = removedLog.parentDirFile - val logsToCheckpoint = logsInDir(logDir) - checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty) - checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) - } - addLogToBeDeleted(removedLog) - info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") - } else if (offlineLogDirs.nonEmpty) { - throw new KafkaStorageException(s"Failed to delete log for ${if (isFuture) "future" else ""} $topicPartition because it may be in one of the offline directories ${offlineLogDirs.mkString(",")}") + removedLog match { + case Some(removedLog) => + // We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. + if (cleaner != null && !isFuture) { + cleaner.abortCleaning(topicPartition) + if (checkpoint) + cleaner.updateCheckpoints(removedLog.parentDirFile) + } + removedLog.renameDir(Log.logDeleteDirName(topicPartition)) + if (checkpoint) { + val logDir = removedLog.parentDirFile + val logsToCheckpoint = logsInDir(logDir) + checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty) + checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) + } + addLogToBeDeleted(removedLog) + info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") + + case None => + if (offlineLogDirs.nonEmpty) { + throw new KafkaStorageException(s"Failed to delete log for ${if (isFuture) "future" else ""} $topicPartition because it may be in one of the offline directories ${offlineLogDirs.mkString(",")}") + } } + removedLog } @@ -1151,6 +1149,16 @@ class LogManager(logDirs: Seq[File], } } } + + private def removeLogAndMetrics(logs: Pool[TopicPartition, Log], tp: TopicPartition): Option[Log] = { + val removedLog = logs.remove(tp) + if (removedLog != null) { + removedLog.removeLogMetrics() + Some(removedLog) + } else { + None + } + } } object LogManager { diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 05d7d71..767e1be 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -20,6 +20,8 @@ package kafka.log import java.io._ import java.util.{Collections, Properties} +import com.yammer.metrics.core.MetricName +import kafka.metrics.KafkaYammerMetrics import kafka.server.{FetchDataInfo, FetchLogEnd} import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils._ @@ -33,6 +35,7 @@ import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{doAnswer, spy} import scala.collection.mutable +import scala.jdk.CollectionConverters._ import scala.util.{Failure, Try} class LogManagerTest { @@ -399,7 +402,7 @@ class LogManagerTest { val txnIndexName = activeSegment.txnIndex.file.getName val indexFilesOnDiskBeforeDelete = activeSegment.log.file.getParentFile.listFiles.filter(_.getName.endsWith("index")) - val removedLog = logManager.asyncDelete(new TopicPartition(name, 0)) + val removedLog = logManager.asyncDelete(new TopicPartition(name, 0)).get val removedSegment = removedLog.activeSegment val indexFilesAfterDelete = Seq(removedSegment.lazyOffsetIndex.file, removedSegment.lazyTimeIndex.file, removedSegment.txnIndex.file) @@ -563,4 +566,76 @@ class LogManagerTest { logManager.topicConfigUpdated("test-topic") assertTrue(logManager.partitionsInitializing.isEmpty) } + + @Test + def testMetricsExistWhenLogIsRecreatedBeforeDeletion(): Unit = { + val topicName = "metric-test" + def logMetrics: mutable.Set[MetricName] = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala. + filter(metric => metric.getType == "Log" && metric.getScope.contains(topicName)) + + val tp = new TopicPartition(topicName, 0) + val metricTag = s"topic=${tp.topic},partition=${tp.partition}" + + def verifyMetrics(): Unit = { + assertEquals(LogMetricNames.allMetricNames.size, logMetrics.size) + logMetrics.foreach { metric => + assertTrue(metric.getMBeanName.contains(metricTag)) + } + } + + // Create the Log and assert that the metrics are present + logManager.getOrCreateLog(tp, () => logConfig) + verifyMetrics() + + // Trigger the deletion and assert that the metrics have been removed + val removedLog = logManager.asyncDelete(tp).get + assertTrue(logMetrics.isEmpty) + + // Recreate the Log and assert that the metrics are present + logManager.getOrCreateLog(tp, () => logConfig) + verifyMetrics() + + // Advance time past the file deletion delay and assert that the removed log has been deleted but the metrics + // are still present + time.sleep(logConfig.fileDeleteDelayMs + 1) + assertTrue(removedLog.logSegments.isEmpty) + verifyMetrics() + } + + @Test + def testMetricsAreRemovedWhenMovingCurrentToFutureLog(): Unit = { + val dir1 = TestUtils.tempDir() + val dir2 = TestUtils.tempDir() + logManager = createLogManager(Seq(dir1, dir2)) + logManager.startup() + + val topicName = "future-log" + def logMetrics: mutable.Set[MetricName] = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala. + filter(metric => metric.getType == "Log" && metric.getScope.contains(topicName)) + + val tp = new TopicPartition(topicName, 0) + val metricTag = s"topic=${tp.topic},partition=${tp.partition}" + + def verifyMetrics(logCount: Int): Unit = { + assertEquals(LogMetricNames.allMetricNames.size * logCount, logMetrics.size) + logMetrics.foreach { metric => + assertTrue(metric.getMBeanName.contains(metricTag)) + } + } + + // Create the current and future logs and verify that metrics are present for both current and future logs + logManager.maybeUpdatePreferredLogDir(tp, dir1.getAbsolutePath) + logManager.getOrCreateLog(tp, () => logConfig) + logManager.maybeUpdatePreferredLogDir(tp, dir2.getAbsolutePath) + logManager.getOrCreateLog(tp, () => logConfig, isFuture = true) + verifyMetrics(2) + + // Replace the current log with the future one and verify that only one set of metrics are present + logManager.replaceCurrentWithFutureLog(tp) + verifyMetrics(1) + + // Trigger the deletion of the former current directory and verify that one set of metrics is still present + time.sleep(logConfig.fileDeleteDelayMs + 1) + verifyMetrics(1) + } } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index f89c3be..43061b6 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -4327,27 +4327,6 @@ class LogTest { assertEquals(1, log.numberOfSegments) } - @Test - def testMetricsRemovedOnLogDeletion(): Unit = { - TestUtils.clearYammerMetrics() - - val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024) - val log = createLog(logDir, logConfig) - val topicPartition = Log.parseTopicPartitionName(logDir) - val metricTag = s"topic=${topicPartition.topic},partition=${topicPartition.partition}" - - val logMetrics = metricsKeySet.filter(_.getType == "Log") - assertEquals(LogMetricNames.allMetricNames.size, logMetrics.size) - logMetrics.foreach { metric => - assertTrue(metric.getMBeanName.contains(metricTag)) - } - - // Delete the log and validate that corresponding metrics were removed. - log.delete() - val logMetricsAfterDeletion = metricsKeySet.filter(_.getType == "Log") - assertTrue(logMetricsAfterDeletion.isEmpty) - } - private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns) private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Int => Unit = {