This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 6624d070348 KAFKA-15375: fix broken clean shutdown detection logic in 
LogManager
6624d070348 is described below

commit 6624d070348fba764d9cefd4f110d98fe8944d8d
Author: Vincent Jiang <[email protected]>
AuthorDate: Wed Aug 30 09:19:24 2023 -0700

    KAFKA-15375: fix broken clean shutdown detection logic in LogManager
    
    When running in kraft mode, LogManager.startup is called in a different 
thread than the main broker (#14239)
    startup thread (by BrokerMetadataPublisher when the first metadata update 
is received.) If a fatal
    error happens during broker startup, before LogManager.startup is 
completed, LogManager.shutdown may
     mark log dirs as clean shutdown improperly.
    
    This PR includes following change:
    1. During LogManager startup time:
      - track hadCleanShutdwon info for each log dir
      - track loadLogsCompleted status for each log dir
    2. During LogManager shutdown time:
      - do not write clean shutdown marker file for log dirs which have 
hadCleanShutdown==false and loadLogsCompleted==false
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 core/src/main/scala/kafka/log/LogManager.scala     | 29 ++++++++++--
 .../test/scala/unit/kafka/log/LogManagerTest.scala | 53 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index d31046092f3..52de8cb74a2 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -125,6 +125,12 @@ class LogManager(logDirs: Seq[File],
     logDirsSet
   }
 
+  // A map that stores hadCleanShutdown flag for each log dir.
+  private val hadCleanShutdownFlags = new ConcurrentHashMap[String, Boolean]()
+
+  // A map that tells whether all logs in a log dir had been loaded or not at 
startup time.
+  private val loadLogsCompletedFlags = new ConcurrentHashMap[String, Boolean]()
+
   @volatile private var _cleaner: LogCleaner = _
   private[kafka] def cleaner: LogCleaner = _cleaner
 
@@ -369,6 +375,7 @@ class LogManager(logDirs: Seq[File],
           // log recovery itself is being performed by `Log` class during 
initialization
           info(s"Attempting recovery for all logs in $logDirAbsolutePath since 
no clean shutdown file was found")
         }
+        hadCleanShutdownFlags.put(logDirAbsolutePath, hadCleanShutdown)
 
         var recoveryPoints = Map[TopicPartition, Long]()
         try {
@@ -391,7 +398,8 @@ class LogManager(logDirs: Seq[File],
         val logsToLoad = 
Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
           logDir.isDirectory && 
UnifiedLog.parseTopicPartitionName(logDir).topic != 
KafkaRaftServer.MetadataTopic)
         numTotalLogs += logsToLoad.length
-        numRemainingLogs.put(dir.getAbsolutePath, logsToLoad.length)
+        numRemainingLogs.put(logDirAbsolutePath, logsToLoad.length)
+        loadLogsCompletedFlags.put(logDirAbsolutePath, logsToLoad.isEmpty)
 
         val jobsForDir = logsToLoad.map { logDir =>
           val runnable: Runnable = () => {
@@ -409,13 +417,18 @@ class LogManager(logDirs: Seq[File],
                 // And while converting IOException to KafkaStorageException, 
we've already handled the exception. So we can ignore it here.
             } finally {
               val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
-              val remainingLogs = decNumRemainingLogs(numRemainingLogs, 
dir.getAbsolutePath)
+              val remainingLogs = decNumRemainingLogs(numRemainingLogs, 
logDirAbsolutePath)
               val currentNumLoaded = logsToLoad.length - remainingLogs
               log match {
                 case Some(loadedLog) => info(s"Completed load of $loadedLog 
with ${loadedLog.numberOfSegments} segments in ${logLoadDurationMs}ms " +
                   s"($currentNumLoaded/${logsToLoad.length} completed in 
$logDirAbsolutePath)")
                 case None => info(s"Error while loading logs in $logDir in 
${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in 
$logDirAbsolutePath)")
               }
+
+              if (remainingLogs == 0) {
+                // loadLog is completed for all logs under the logDdir, mark 
it.
+                loadLogsCompletedFlags.put(logDirAbsolutePath, true)
+              }
             }
           }
           runnable
@@ -612,9 +625,15 @@ class LogManager(logDirs: Seq[File],
           debug(s"Updating log start offsets at $dir")
           checkpointLogStartOffsetsInDir(dir, logs)
 
-          // mark that the shutdown was clean by creating marker file
-          debug(s"Writing clean shutdown marker at $dir")
-          CoreUtils.swallow(Files.createFile(new File(dir, 
LogLoader.CleanShutdownFile).toPath), this)
+          // mark that the shutdown was clean by creating marker file for log 
dirs that:
+          //  1. had clean shutdown marker file; or
+          //  2. had no clean shutdown marker file, but all logs under it have 
been recovered at startup time
+          val logDirAbsolutePath = dir.getAbsolutePath
+          if (hadCleanShutdownFlags.getOrDefault(logDirAbsolutePath, false) ||
+              loadLogsCompletedFlags.getOrDefault(logDirAbsolutePath, false)) {
+            debug(s"Writing clean shutdown marker at $dir")
+            CoreUtils.swallow(Files.createFile(new File(dir, 
LogLoader.CleanShutdownFile).toPath), this)
+          }
         }
       }
     } finally {
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a7798418ed8..dc185fd1c0a 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -134,6 +134,59 @@ class LogManagerTest {
     }
   }
 
+  /*
+   * Test that LogManager.shutdown() doesn't create clean shutdown file for a 
log directory that has not completed
+   * recovery.
+   */
+  @Test
+  def testCleanShutdownFileWhenShutdownCalledBeforeStartupComplete(): Unit = {
+    // 1. create two logs under logDir
+    val topicPartition0 = new TopicPartition(name, 0)
+    val topicPartition1 = new TopicPartition(name, 1)
+    val log0 = logManager.getOrCreateLog(topicPartition0, topicId = None)
+    val log1 = logManager.getOrCreateLog(topicPartition1, topicId = None)
+    val logFile0 = new File(logDir, name + "-0")
+    val logFile1 = new File(logDir, name + "-1")
+    assertTrue(logFile0.exists)
+    assertTrue(logFile1.exists)
+
+    log0.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), 
leaderEpoch = 0)
+    log0.takeProducerSnapshot()
+
+    log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), 
leaderEpoch = 0)
+    log1.takeProducerSnapshot()
+
+    // 2. simulate unclean shutdown by deleting clean shutdown marker file
+    logManager.shutdown()
+    assertTrue(Files.deleteIfExists(new File(logDir, 
LogLoader.CleanShutdownFile).toPath))
+
+    // 3. create a new LogManager and start it in a different thread
+    @volatile var loadLogCalled = 0
+    logManager = spy(createLogManager())
+    doAnswer { invocation =>
+      // intercept LogManager.loadLog to sleep 5 seconds so that there is 
enough time to call LogManager.shutdown
+      // before LogManager.startup completes.
+      Thread.sleep(5000)
+      invocation.callRealMethod().asInstanceOf[UnifiedLog]
+      loadLogCalled = loadLogCalled + 1
+    }.when(logManager).loadLog(any[File], any[Boolean], 
any[Map[TopicPartition, Long]], any[Map[TopicPartition, Long]],
+      any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, 
Int]])
+
+    val t = new Thread() {
+      override def run(): Unit = { logManager.startup(Set.empty) }
+    }
+    t.start()
+
+    // 4. shutdown LogManager after the first log is loaded but before the 
second log is loaded
+    TestUtils.waitUntilTrue(() => loadLogCalled == 1,
+      "Timed out waiting for only the first log to be loaded")
+    logManager.shutdown()
+    logManager = null
+
+    // 5. verify that CleanShutdownFile is not created under logDir
+    assertFalse(Files.exists(new File(logDir, 
LogLoader.CleanShutdownFile).toPath))
+  }
+
   /**
    * Test that getOrCreateLog on a non-existent log creates a new log and that 
we can append to the new log.
    * The LogManager is configured with one invalid log directory which should 
be marked as offline.

Reply via email to