This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fa33fb4d3c KAFKA-13773: catch kafkaStorageException to avoid broker
shutdown directly (#12136)
fa33fb4d3c is described below
commit fa33fb4d3ced39ee985a15c1e44650542a3b47d2
Author: Luke Chen <[email protected]>
AuthorDate: Thu Jun 2 14:15:51 2022 +0800
KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly
(#12136)
When logManager startup and loadLogs, we expect to catch any IOException
(ex: out of space error) and turn the log dir into offline. Later, we'll handle
the offline logDir in ReplicaManage, so that the cleanShutdown file won't be
created when all logDirs are offline. The reason why the broker shutdown with
cleanShutdown file after full disk is because during loadLogs and do log
recovery, we'll write leader-epoch-checkpoint fil. And if any IOException
thrown, we'll wrap it as KafkaStor [...]
This PR is to fix the issue by catching the KafkaStorageException with
IOException cause exceptions during loadLogs, and mark the logDir as offline to
let the ReplicaManager handle the offline logDirs.
Reviewers: Jun Rao <[email protected]>, Alok Thatikunta
<[email protected]>
---
core/src/main/scala/kafka/log/LogManager.scala | 14 +++-
core/src/main/scala/kafka/server/KafkaServer.scala | 8 +-
.../test/scala/unit/kafka/log/LogLoaderTest.scala | 85 +++++++++++++++++-----
.../unit/kafka/server/ServerShutdownTest.scala | 27 ++++++-
4 files changed, 107 insertions(+), 27 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index c79faeba3b..bdc7ffd74d 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -318,6 +318,11 @@ class LogManager(logDirs: Seq[File],
val jobs = ArrayBuffer.empty[Seq[Future[_]]]
var numTotalLogs = 0
+ def handleIOException(logDirAbsolutePath: String, e: IOException): Unit = {
+ offlineDirs.add((logDirAbsolutePath, e))
+ error(s"Error while loading log dir $logDirAbsolutePath", e)
+ }
+
for (dir <- liveLogDirs) {
val logDirAbsolutePath = dir.getAbsolutePath
var hadCleanShutdown: Boolean = false
@@ -376,8 +381,10 @@ class LogManager(logDirs: Seq[File],
s"($currentNumLoaded/${logsToLoad.length} loaded in
$logDirAbsolutePath)")
} catch {
case e: IOException =>
- offlineDirs.add((logDirAbsolutePath, e))
- error(s"Error while loading log dir $logDirAbsolutePath", e)
+ handleIOException(logDirAbsolutePath, e)
+ case e: KafkaStorageException if
e.getCause.isInstanceOf[IOException] =>
+ // KafkaStorageException might be thrown, ex: during writing
LeaderEpochFileCache
+ // And while converting IOException to KafkaStorageException,
we've already handled the exception. So we can ignore it here.
}
}
runnable
@@ -386,8 +393,7 @@ class LogManager(logDirs: Seq[File],
jobs += jobsForDir.map(pool.submit)
} catch {
case e: IOException =>
- offlineDirs.add((logDirAbsolutePath, e))
- error(s"Error while loading log dir $logDirAbsolutePath", e)
+ handleIOException(logDirAbsolutePath, e)
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 074fc497e0..64cf88d4ee 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -830,7 +830,13 @@ class KafkaServer(
private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = {
for (logDir <- config.logDirs if logManager.isLogDirOnline(new
File(logDir).getAbsolutePath)) {
val checkpoint = brokerMetadataCheckpoints(logDir)
- checkpoint.write(brokerMetadata.toProperties)
+ try {
+ checkpoint.write(brokerMetadata.toProperties)
+ } catch {
+ case e: IOException =>
+ val dirPath = checkpoint.file.getAbsolutePath
+ logDirFailureChannel.maybeAddOfflineLogDir(dirPath, s"Error while
writing meta.properties to $dirPath", e)
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index fba8e9bad8..9d7a3b2023 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -17,21 +17,22 @@
package kafka.log
-import java.io.{BufferedWriter, File, FileWriter}
+import java.io.{BufferedWriter, File, FileWriter, IOException}
import java.nio.ByteBuffer
import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util.Properties
-
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig,
LogDirFailureChannel}
import kafka.server.metadata.MockConfigRepository
import kafka.utils.{CoreUtils, MockTime, Scheduler, TestUtils}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{CompressionType, ControlRecordType,
DefaultRecordBatch, MemoryRecords, RecordBatch, RecordVersion, SimpleRecord,
TimestampType}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals,
assertFalse, assertNotEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyLong}
@@ -63,6 +64,12 @@ class LogLoaderTest {
Utils.delete(tmpDir)
}
+ object ErrorTypes extends Enumeration {
+ type Errors = Value
+ val IOException, RuntimeException,
KafkaStorageExceptionWithIOExceptionCause,
+ KafkaStorageExceptionWithoutIOExceptionCause = Value
+ }
+
@Test
def testLogRecoveryIsCalledUponBrokerCrash(): Unit = {
// LogManager must realize correctly if the last shutdown was not clean
and the logs need
@@ -75,15 +82,19 @@ class LogLoaderTest {
var log: UnifiedLog = null
val time = new MockTime()
var cleanShutdownInterceptedValue = false
- case class SimulateError(var hasError: Boolean = false)
+ case class SimulateError(var hasError: Boolean = false, var errorType:
ErrorTypes.Errors = ErrorTypes.RuntimeException)
val simulateError = SimulateError()
+ val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
val maxTransactionTimeoutMs = 5 * 60 * 1000
val maxProducerIdExpirationMs = 60 * 60 * 1000
// Create a LogManager with some overridden methods to facilitate
interception of clean shutdown
- // flag and to inject a runtime error
- def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File],
simulateError: SimulateError): LogManager = {
+ // flag and to inject an error
+ def interceptedLogManager(logConfig: LogConfig,
+ logDirs: Seq[File],
+ logDirFailureChannel: LogDirFailureChannel
+ ): LogManager = {
new LogManager(
logDirs = logDirs.map(_.getAbsoluteFile),
initialOfflineDirs = Array.empty[File],
@@ -100,7 +111,7 @@ class LogLoaderTest {
interBrokerProtocolVersion = config.interBrokerProtocolVersion,
scheduler = time.scheduler,
brokerTopicStats = new BrokerTopicStats(),
- logDirFailureChannel = new LogDirFailureChannel(logDirs.size),
+ logDirFailureChannel = logDirFailureChannel,
time = time,
keepPartitionMetadataFile = config.usesTopicId) {
@@ -108,7 +119,16 @@ class LogLoaderTest {
logStartOffsets: Map[TopicPartition, Long],
defaultConfig: LogConfig,
topicConfigs: Map[String, LogConfig]): UnifiedLog
= {
if (simulateError.hasError) {
- throw new RuntimeException("Simulated error")
+ simulateError.errorType match {
+ case ErrorTypes.KafkaStorageExceptionWithIOExceptionCause =>
+ throw new KafkaStorageException(new IOException("Simulated
Kafka storage error with IOException cause"))
+ case ErrorTypes.KafkaStorageExceptionWithoutIOExceptionCause =>
+ throw new KafkaStorageException("Simulated Kafka storage error
without IOException cause")
+ case ErrorTypes.IOException =>
+ throw new IOException("Simulated IO error")
+ case _ =>
+ throw new RuntimeException("Simulated Runtime error")
+ }
}
cleanShutdownInterceptedValue = hadCleanShutdown
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
@@ -134,10 +154,24 @@ class LogLoaderTest {
}
}
+ def initializeLogManagerForSimulatingErrorTest(logDirFailureChannel:
LogDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+ ): (LogManager, Executable)
= {
+ val logManager: LogManager = interceptedLogManager(logConfig, logDirs,
logDirFailureChannel)
+ log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId =
None)
+
+
assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log
dir should not be offline before load logs")
+
+ val runLoadLogs: Executable = () => {
+ val defaultConfig = logManager.currentDefaultConfig
+ logManager.loadLogs(defaultConfig,
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
+ }
+
+ (logManager, runLoadLogs)
+ }
+
val cleanShutdownFile = new File(logDir, LogLoader.CleanShutdownFile)
locally {
- val logManager: LogManager = interceptedLogManager(logConfig, logDirs,
simulateError)
- log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId =
None)
+ val (logManager, _) = initializeLogManagerForSimulatingErrorTest()
// Load logs after a clean shutdown
Files.createFile(cleanShutdownFile.toPath)
@@ -158,22 +192,37 @@ class LogLoaderTest {
}
locally {
- simulateError.hasError = true
- val logManager: LogManager = interceptedLogManager(logConfig, logDirs,
simulateError)
- log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId =
None)
+ val (logManager, runLoadLogs) =
initializeLogManagerForSimulatingErrorTest(logDirFailureChannel)
- // Simulate error
- assertThrows(classOf[RuntimeException], () => {
- val defaultConfig = logManager.currentDefaultConfig
- logManager.loadLogs(defaultConfig,
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
- })
+ // Simulate Runtime error
+ simulateError.hasError = true
+ simulateError.errorType = ErrorTypes.RuntimeException
+ assertThrows(classOf[RuntimeException], runLoadLogs)
assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not
have existed")
+
assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log
dir should not turn offline when Runtime Exception thrown")
+
+ // Simulate Kafka storage error with IOException cause
+ // in this case, the logDir will be added into offline list before
KafkaStorageThrown. So we don't verify it here
+ simulateError.errorType =
ErrorTypes.KafkaStorageExceptionWithIOExceptionCause
+ assertDoesNotThrow(runLoadLogs, "KafkaStorageException with IOException
cause should be caught and handled")
+
+ // Simulate Kafka storage error without IOException cause
+ simulateError.errorType =
ErrorTypes.KafkaStorageExceptionWithoutIOExceptionCause
+ assertThrows(classOf[KafkaStorageException], runLoadLogs, "should throw
exception when KafkaStorageException without IOException cause")
+
assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log
dir should not turn offline when KafkaStorageException without IOException
cause thrown")
+
+ // Simulate IO error
+ simulateError.errorType = ErrorTypes.IOException
+ assertDoesNotThrow(runLoadLogs, "IOException should be caught and
handled")
+
assertTrue(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "the
log dir should turn offline after IOException thrown")
+
// Do not simulate error on next call to LogManager#loadLogs. LogManager
must understand that log had unclean shutdown the last time.
simulateError.hasError = false
cleanShutdownInterceptedValue = true
val defaultConfig = logManager.currentDefaultConfig
logManager.loadLogs(defaultConfig,
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean
shutdown flag")
+ logManager.shutdown()
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 08c00bcae6..96aeac5fa6 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.server
-import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
+import kafka.utils.{CoreUtils, Exit, TestInfoUtils, TestUtils}
import java.io.{DataInputStream, File}
import java.net.ServerSocket
@@ -30,7 +30,6 @@ import kafka.zookeeper.ZooKeeperClientTimeoutException
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
@@ -41,6 +40,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.{BeforeEach, Disabled, Test, TestInfo, Timeout}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -158,14 +158,33 @@ class ServerShutdownTest extends KafkaServerTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
- def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String):
Unit = {
+ def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String):
Unit = {
createTopic(topic)
shutdownBroker()
config.logDirs.foreach { dirName =>
val partitionDir = new File(dirName, s"$topic-0")
partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f,
TestUtils.random.nextInt(1024) + 1))
}
- verifyCleanShutdownAfterFailedStartup[KafkaStorageException]
+
+ val expectedStatusCode = Some(1)
+ @volatile var receivedStatusCode = Option.empty[Int]
+ @volatile var hasHaltProcedureCalled = false
+ Exit.setHaltProcedure((statusCode, _) => {
+ hasHaltProcedureCalled = true
+ receivedStatusCode = Some(statusCode)
+ }.asInstanceOf[Nothing])
+
+ try {
+ val recreateBrokerExec: Executable = () => recreateBroker(true)
+ // this startup should fail with no online log dir (due to corrupted
log), and exit directly without throwing exception
+ assertDoesNotThrow(recreateBrokerExec)
+ // JVM should exit with status code 1
+ TestUtils.waitUntilTrue(() => hasHaltProcedureCalled == true &&
expectedStatusCode == receivedStatusCode,
+ s"Expected to halt directly with the expected status
code:${expectedStatusCode.get}, " +
+ s"but got hasHaltProcedureCalled: $hasHaltProcedureCalled and
received status code: ${receivedStatusCode.orNull}")
+ } finally {
+ Exit.resetHaltProcedure()
+ }
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)