This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 90db4f47d6 KAFKA-13773: catch kafkaStorageException to avoid broker 
shutdown directly (#12136)
90db4f47d6 is described below

commit 90db4f47d6394fd4b90b934bf0bb2d7965cd73a6
Author: Luke Chen <[email protected]>
AuthorDate: Thu Jun 2 14:15:51 2022 +0800

    KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly 
(#12136)
    
    When logManager startup and loadLogs, we expect to catch any IOException 
(ex: out of space error) and turn the log dir into offline. Later, we'll handle 
the offline logDir in ReplicaManage, so that the cleanShutdown file won't be 
created when all logDirs are offline. The reason why the broker shutdown with 
cleanShutdown file after full disk is because during loadLogs and do log 
recovery, we'll write leader-epoch-checkpoint fil. And if any IOException 
thrown, we'll wrap it as KafkaStor [...]
    
    This PR is to fix the issue by catching the KafkaStorageException with 
IOException cause exceptions during loadLogs, and mark the logDir as offline to 
let the ReplicaManager handle the offline logDirs.
    
    Reviewers: Jun Rao <[email protected]>, Alok Thatikunta 
<[email protected]>
---
 core/src/main/scala/kafka/log/LogManager.scala     | 14 ++--
 core/src/main/scala/kafka/server/KafkaServer.scala |  8 ++-
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  | 84 +++++++++++++++++-----
 .../unit/kafka/server/ServerShutdownTest.scala     | 27 +++++--
 4 files changed, 107 insertions(+), 26 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index b81f6a928a..93d1eee740 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -317,6 +317,11 @@ class LogManager(logDirs: Seq[File],
     val jobs = ArrayBuffer.empty[Seq[Future[_]]]
     var numTotalLogs = 0
 
+    def handleIOException(logDirAbsolutePath: String, e: IOException): Unit = {
+      offlineDirs.add((logDirAbsolutePath, e))
+      error(s"Error while loading log dir $logDirAbsolutePath", e)
+    }
+
     for (dir <- liveLogDirs) {
       val logDirAbsolutePath = dir.getAbsolutePath
       var hadCleanShutdown: Boolean = false
@@ -375,8 +380,10 @@ class LogManager(logDirs: Seq[File],
                 s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
             } catch {
               case e: IOException =>
-                offlineDirs.add((logDirAbsolutePath, e))
-                error(s"Error while loading log dir $logDirAbsolutePath", e)
+                handleIOException(logDirAbsolutePath, e)
+              case e: KafkaStorageException if 
e.getCause.isInstanceOf[IOException] =>
+                // KafkaStorageException might be thrown, ex: during writing 
LeaderEpochFileCache
+                // And while converting IOException to KafkaStorageException, 
we've already handled the exception. So we can ignore it here.
             }
           }
           runnable
@@ -385,8 +392,7 @@ class LogManager(logDirs: Seq[File],
         jobs += jobsForDir.map(pool.submit)
       } catch {
         case e: IOException =>
-          offlineDirs.add((logDirAbsolutePath, e))
-          error(s"Error while loading log dir $logDirAbsolutePath", e)
+          handleIOException(logDirAbsolutePath, e)
       }
     }
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 67013d3391..5e66fd7f05 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -830,7 +830,13 @@ class KafkaServer(
   private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = {
     for (logDir <- config.logDirs if logManager.isLogDirOnline(new 
File(logDir).getAbsolutePath)) {
       val checkpoint = brokerMetadataCheckpoints(logDir)
-      checkpoint.write(brokerMetadata.toProperties)
+      try {
+        checkpoint.write(brokerMetadata.toProperties)
+      } catch {
+        case e: IOException =>
+          val dirPath = checkpoint.file.getAbsolutePath
+          logDirFailureChannel.maybeAddOfflineLogDir(dirPath, s"Error while 
writing meta.properties to $dirPath", e)
+      }
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index a6b114320a..4f659da7dd 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.log
 
-import java.io.{BufferedWriter, File, FileWriter}
+import java.io.{BufferedWriter, File, FileWriter, IOException}
 import java.nio.ByteBuffer
 import java.nio.file.{Files, NoSuchFileException, Paths}
 import java.util.Properties
@@ -27,9 +27,11 @@ import kafka.server.{BrokerTopicStats, FetchDataInfo, 
KafkaConfig, LogDirFailure
 import kafka.server.metadata.MockConfigRepository
 import kafka.utils.{CoreUtils, MockTime, Scheduler, TestUtils}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.record.{CompressionType, ControlRecordType, 
DefaultRecordBatch, MemoryRecords, RecordBatch, RecordVersion, SimpleRecord, 
TimestampType}
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, 
assertFalse, assertNotEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.{any, anyLong}
@@ -61,6 +63,12 @@ class LogLoaderTest {
     Utils.delete(tmpDir)
   }
 
+  object ErrorTypes extends Enumeration {
+    type Errors = Value
+    val IOException, RuntimeException, 
KafkaStorageExceptionWithIOExceptionCause,
+    KafkaStorageExceptionWithoutIOExceptionCause = Value
+  }
+
   @Test
   def testLogRecoveryIsCalledUponBrokerCrash(): Unit = {
     // LogManager must realize correctly if the last shutdown was not clean 
and the logs need
@@ -73,15 +81,19 @@ class LogLoaderTest {
     var log: UnifiedLog = null
     val time = new MockTime()
     var cleanShutdownInterceptedValue = false
-    case class SimulateError(var hasError: Boolean = false)
+    case class SimulateError(var hasError: Boolean = false, var errorType: 
ErrorTypes.Errors = ErrorTypes.RuntimeException)
     val simulateError = SimulateError()
+    val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
 
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val maxProducerIdExpirationMs = 60 * 60 * 1000
 
     // Create a LogManager with some overridden methods to facilitate 
interception of clean shutdown
-    // flag and to inject a runtime error
-    def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], 
simulateError: SimulateError): LogManager = {
+    // flag and to inject an error
+    def interceptedLogManager(logConfig: LogConfig,
+                              logDirs: Seq[File],
+                              logDirFailureChannel: LogDirFailureChannel
+                             ): LogManager = {
       new LogManager(
         logDirs = logDirs.map(_.getAbsoluteFile),
         initialOfflineDirs = Array.empty[File],
@@ -98,7 +110,7 @@ class LogLoaderTest {
         interBrokerProtocolVersion = config.interBrokerProtocolVersion,
         scheduler = time.scheduler,
         brokerTopicStats = new BrokerTopicStats(),
-        logDirFailureChannel = new LogDirFailureChannel(logDirs.size),
+        logDirFailureChannel = logDirFailureChannel,
         time = time,
         keepPartitionMetadataFile = config.usesTopicId) {
 
@@ -106,7 +118,16 @@ class LogLoaderTest {
                              logStartOffsets: Map[TopicPartition, Long], 
defaultConfig: LogConfig,
                              topicConfigs: Map[String, LogConfig]): UnifiedLog 
= {
           if (simulateError.hasError) {
-            throw new RuntimeException("Simulated error")
+            simulateError.errorType match {
+              case ErrorTypes.KafkaStorageExceptionWithIOExceptionCause =>
+                throw new KafkaStorageException(new IOException("Simulated 
Kafka storage error with IOException cause"))
+              case ErrorTypes.KafkaStorageExceptionWithoutIOExceptionCause =>
+                throw new KafkaStorageException("Simulated Kafka storage error 
without IOException cause")
+              case ErrorTypes.IOException =>
+                throw new IOException("Simulated IO error")
+              case _ =>
+                throw new RuntimeException("Simulated Runtime error")
+            }
           }
           cleanShutdownInterceptedValue = hadCleanShutdown
           val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
@@ -132,10 +153,24 @@ class LogLoaderTest {
       }
     }
 
+    def initializeLogManagerForSimulatingErrorTest(logDirFailureChannel: 
LogDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+                                                  ): (LogManager, Executable) 
= {
+      val logManager: LogManager = interceptedLogManager(logConfig, logDirs, 
logDirFailureChannel)
+      log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = 
None)
+
+      
assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log 
dir should not be offline before load logs")
+
+      val runLoadLogs: Executable = () => {
+        val defaultConfig = logManager.currentDefaultConfig
+        logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
+      }
+
+      (logManager, runLoadLogs)
+    }
+
     val cleanShutdownFile = new File(logDir, LogLoader.CleanShutdownFile)
     locally {
-      val logManager: LogManager = interceptedLogManager(logConfig, logDirs, 
simulateError)
-      log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = 
None)
+      val (logManager, _) = initializeLogManagerForSimulatingErrorTest()
 
       // Load logs after a clean shutdown
       Files.createFile(cleanShutdownFile.toPath)
@@ -156,22 +191,37 @@ class LogLoaderTest {
     }
 
     locally {
-      simulateError.hasError = true
-      val logManager: LogManager = interceptedLogManager(logConfig, logDirs, 
simulateError)
-      log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = 
None)
+      val (logManager, runLoadLogs) = 
initializeLogManagerForSimulatingErrorTest(logDirFailureChannel)
 
-      // Simulate error
-      assertThrows(classOf[RuntimeException], () => {
-        val defaultConfig = logManager.currentDefaultConfig
-        logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
-      })
+      // Simulate Runtime error
+      simulateError.hasError = true
+      simulateError.errorType = ErrorTypes.RuntimeException
+      assertThrows(classOf[RuntimeException], runLoadLogs)
       assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not 
have existed")
+      
assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log 
dir should not turn offline when Runtime Exception thrown")
+
+      // Simulate Kafka storage error with IOException cause
+      // in this case, the logDir will be added into offline list before 
KafkaStorageThrown. So we don't verify it here
+      simulateError.errorType = 
ErrorTypes.KafkaStorageExceptionWithIOExceptionCause
+      assertDoesNotThrow(runLoadLogs, "KafkaStorageException with IOException 
cause should be caught and handled")
+
+      // Simulate Kafka storage error without IOException cause
+      simulateError.errorType = 
ErrorTypes.KafkaStorageExceptionWithoutIOExceptionCause
+      assertThrows(classOf[KafkaStorageException], runLoadLogs, "should throw 
exception when KafkaStorageException without IOException cause")
+      
assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log 
dir should not turn offline when KafkaStorageException without IOException 
cause thrown")
+
+      // Simulate IO error
+      simulateError.errorType = ErrorTypes.IOException
+      assertDoesNotThrow(runLoadLogs, "IOException should be caught and 
handled")
+      
assertTrue(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "the 
log dir should turn offline after IOException thrown")
+
       // Do not simulate error on next call to LogManager#loadLogs. LogManager 
must understand that log had unclean shutdown the last time.
       simulateError.hasError = false
       cleanShutdownInterceptedValue = true
       val defaultConfig = logManager.currentDefaultConfig
       logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
       assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean 
shutdown flag")
+      logManager.shutdown()
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 16d17d2fd2..7e9fb1e726 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,7 +16,7 @@
  */
 package kafka.server
 
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.{CoreUtils, Exit, TestUtils}
 
 import java.io.{DataInputStream, File}
 import java.net.ServerSocket
@@ -30,7 +30,6 @@ import kafka.zookeeper.ZooKeeperClientTimeoutException
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
@@ -41,6 +40,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.BrokerState
 import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
@@ -158,14 +158,33 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
   @ParameterizedTest
   @ValueSource(strings = Array("zk", "kraft"))
-  def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): 
Unit = {
+  def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): 
Unit = {
     createTopic(topic)
     shutdownBroker()
     config.logDirs.foreach { dirName =>
       val partitionDir = new File(dirName, s"$topic-0")
       partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, 
TestUtils.random.nextInt(1024) + 1))
     }
-    verifyCleanShutdownAfterFailedStartup[KafkaStorageException](quorum)
+
+    val expectedStatusCode = Some(1)
+    @volatile var receivedStatusCode = Option.empty[Int]
+    @volatile var hasHaltProcedureCalled = false
+    Exit.setHaltProcedure((statusCode, _) => {
+      hasHaltProcedureCalled = true
+      receivedStatusCode = Some(statusCode)
+    }.asInstanceOf[Nothing])
+
+    try {
+      val recreateBrokerExec: Executable = () => recreateBroker(true)
+      // this startup should fail with no online log dir (due to corrupted 
log), and exit directly without throwing exception
+      assertDoesNotThrow(recreateBrokerExec)
+      // JVM should exit with status code 1
+      TestUtils.waitUntilTrue(() => hasHaltProcedureCalled == true && 
expectedStatusCode == receivedStatusCode,
+        s"Expected to halt directly with the expected status 
code:${expectedStatusCode.get}, " +
+          s"but got hasHaltProcedureCalled: $hasHaltProcedureCalled and 
received status code: ${receivedStatusCode.orNull}")
+    } finally {
+      Exit.resetHaltProcedure()
+    }
   }
 
   @ParameterizedTest

Reply via email to