This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2fc1875d0b1 KAFKA-14557; Lock metadata log dir (#13058)
2fc1875d0b1 is described below
commit 2fc1875d0b1e5cd58727bbe80ee6d06602671993
Author: José Armando García Sancio <[email protected]>
AuthorDate: Tue Jan 10 10:18:40 2023 -0800
KAFKA-14557; Lock metadata log dir (#13058)
This change makes sure that Kafka grabs a log dir lock in the following
additional cases:
1. When a Kafka node runs in controller only. The current implementation
doesn't grab a file lock because the LogManager is never instantiated.
2. When the metadata log dir is different from the log dir(s). The current
implementation of LogManager doesn't load or grab a lock on the metadata dir.
Reviewers: Ron Dagostino <[email protected]> , dengziming
<[email protected]>
---
core/src/main/scala/kafka/log/LogManager.scala | 4 +-
core/src/main/scala/kafka/raft/RaftManager.scala | 58 +++++--
.../main/scala/kafka/server/KafkaRaftServer.scala | 8 +-
.../scala/unit/kafka/raft/RaftManagerTest.scala | 172 ++++++++++++++++-----
4 files changed, 189 insertions(+), 53 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index a82cf5d1bfd..1a3f28eaba4 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -79,7 +79,6 @@ class LogManager(logDirs: Seq[File],
import LogManager._
- val LockFile = ".lock"
val InitialTaskDelayMs = 30 * 1000
private val logCreationOrDeletionLock = new Object
@@ -241,7 +240,7 @@ class LogManager(logDirs: Seq[File],
private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
dirs.flatMap { dir =>
try {
- val lock = new FileLock(new File(dir, LockFile))
+ val lock = new FileLock(new File(dir, LockFileName))
if (!lock.tryLock())
throw new KafkaException("Failed to acquire lock on file .lock in "
+ lock.file.getParent +
". A Kafka instance in another process or thread is using this
directory.")
@@ -1341,6 +1340,7 @@ class LogManager(logDirs: Seq[File],
}
object LogManager {
+ val LockFileName = ".lock"
/**
* Wait all jobs to complete
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala
b/core/src/main/scala/kafka/raft/RaftManager.scala
index 5b8fe1e8276..bbb31806c3b 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -18,15 +18,25 @@ package kafka.raft
import java.io.File
import java.nio.file.Files
+import java.nio.file.Paths
import java.util
import java.util.OptionalInt
import java.util.concurrent.CompletableFuture
+import kafka.log.LogManager
import kafka.log.UnifiedLog
import kafka.raft.KafkaRaftManager.RaftIoThread
+import kafka.server.KafkaRaftServer.ControllerRole
import kafka.server.{KafkaConfig, MetaProperties}
+import kafka.utils.CoreUtils
+import kafka.utils.FileLock
+import kafka.utils.KafkaScheduler
+import kafka.utils.Logging
+import kafka.utils.ShutdownableThread
import kafka.utils.timer.SystemTimer
-import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater,
NetworkClient}
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, ListenerName,
NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.ApiMessage
@@ -34,7 +44,6 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec,
NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient,
LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest, ReplicatedLog}
import org.apache.kafka.server.common.serialization.RecordSerde
@@ -78,6 +87,19 @@ object KafkaRaftManager {
Files.createDirectories(dir.toPath)
dir
}
+
+ private def lockDataDir(dataDir: File): FileLock = {
+ val lock = new FileLock(new File(dataDir, LogManager.LockFileName))
+
+ if (!lock.tryLock()) {
+ throw new KafkaException(
+ s"Failed to acquire lock on file .lock in ${lock.file.getParent}. A
Kafka instance in another process or " +
+ "thread is using this directory."
+ )
+ }
+
+ lock
+ }
}
trait RaftManager[T] {
@@ -120,6 +142,23 @@ class KafkaRaftManager[T](
scheduler.startup()
private val dataDir = createDataDir()
+
+ private val dataDirLock = {
+ // Aquire the log dir lock if the metadata log dir is different from the
log dirs
+ val differentMetadataLogDir = !config
+ .logDirs
+ .map(Paths.get(_).toAbsolutePath)
+ .contains(Paths.get(config.metadataLogDir).toAbsolutePath)
+ // Or this node is only a controller
+ val isOnlyController = config.processRoles == Set(ControllerRole)
+
+ if (differentMetadataLogDir || isOnlyController) {
+ Some(KafkaRaftManager.lockDataDir(new File(config.metadataLogDir)))
+ } else {
+ None
+ }
+ }
+
override val replicatedLog: ReplicatedLog = buildMetadataLog()
private val netChannel = buildNetworkChannel()
private val expirationTimer = new SystemTimer("raft-expiration-executor")
@@ -147,13 +186,14 @@ class KafkaRaftManager[T](
}
def shutdown(): Unit = {
- expirationService.shutdown()
- expirationTimer.shutdown()
- raftIoThread.shutdown()
- client.close()
- scheduler.shutdown()
- netChannel.close()
- replicatedLog.close()
+ CoreUtils.swallow(expirationService.shutdown(), this)
+ CoreUtils.swallow(expirationTimer.shutdown(), this)
+ CoreUtils.swallow(raftIoThread.shutdown(), this)
+ CoreUtils.swallow(client.close(), this)
+ CoreUtils.swallow(scheduler.shutdown(), this)
+ CoreUtils.swallow(netChannel.close(), this)
+ CoreUtils.swallow(replicatedLog.close(), this)
+ CoreUtils.swallow(dataDirLock.foreach(_.destroy()), this)
}
override def register(
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 9cb95e39166..f5f2c121975 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -124,8 +124,12 @@ object KafkaRaftServer {
val MetadataTopicId = Uuid.METADATA_TOPIC_ID
sealed trait ProcessRole
- case object BrokerRole extends ProcessRole
- case object ControllerRole extends ProcessRole
+ case object BrokerRole extends ProcessRole {
+ override def toString(): String = "broker"
+ }
+ case object ControllerRole extends ProcessRole {
+ override def toString(): String = "controller"
+ }
/**
* Initialize the configured log directories, including both
[[KafkaConfig.MetadataLogDirProp]]
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 9d7a93db94c..9907c802d2b 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -16,10 +16,20 @@
*/
package kafka.raft
-import java.util.concurrent.CompletableFuture
+import java.nio.channels.FileChannel
+import java.nio.channels.OverlappingFileLockException
+import java.nio.file.Path
+import java.nio.file.StandardOpenOption
import java.util.Properties
+import java.util.concurrent.CompletableFuture
+import kafka.log.LogManager
import kafka.raft.KafkaRaftManager.RaftIoThread
-import kafka.server.{KafkaConfig, MetaProperties}
+import kafka.server.KafkaConfig
+import kafka.server.KafkaRaftServer.BrokerRole
+import kafka.server.KafkaRaftServer.ControllerRole
+import kafka.server.KafkaRaftServer.ProcessRole
+import kafka.server.MetaProperties
+import kafka.utils.TestUtils
import kafka.tools.TestRaftServer.ByteArraySerde
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Uuid
@@ -27,41 +37,50 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.raft.KafkaRaftClient
import org.apache.kafka.raft.RaftConfig
-import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.mockito.Mockito._
-import java.io.File
-
class RaftManagerTest {
-
- private def instantiateRaftManagerWithConfigs(topicPartition:
TopicPartition, processRoles: String, nodeId: String) = {
- def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String,
logDir: File): KafkaConfig = {
- val props = new Properties
- props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath)
- props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
- props.setProperty(KafkaConfig.NodeIdProp, nodeId)
- props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
- if (processRoles.contains("broker")) {
- props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
- if (processRoles.contains("controller")) { // co-located
- props.setProperty(KafkaConfig.ListenersProp,
"PLAINTEXT://localhost:9092,SSL://localhost:9093")
- props.setProperty(KafkaConfig.QuorumVotersProp,
s"${nodeId}@localhost:9093")
- } else { // broker-only
- val voterId = (nodeId.toInt + 1)
- props.setProperty(KafkaConfig.QuorumVotersProp,
s"${voterId}@localhost:9093")
- }
- } else if (processRoles.contains("controller")) { // controller-only
- props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+ private def createConfig(
+ processRoles: Set[ProcessRole],
+ nodeId: Int,
+ logDir: Option[Path],
+ metadataDir: Option[Path]
+ ): KafkaConfig = {
+ val props = new Properties
+ logDir.foreach { value =>
+ props.setProperty(KafkaConfig.LogDirProp, value.toString)
+ }
+ metadataDir.foreach { value =>
+ props.setProperty(KafkaConfig.MetadataLogDirProp, value.toString)
+ }
+ props.setProperty(KafkaConfig.ProcessRolesProp, processRoles.mkString(","))
+ props.setProperty(KafkaConfig.NodeIdProp, nodeId.toString)
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ if (processRoles.contains(BrokerRole)) {
+ props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
+ if (processRoles.contains(ControllerRole)) { // co-located
+ props.setProperty(KafkaConfig.ListenersProp,
"PLAINTEXT://localhost:9092,SSL://localhost:9093")
props.setProperty(KafkaConfig.QuorumVotersProp,
s"${nodeId}@localhost:9093")
+ } else { // broker-only
+ val voterId = nodeId + 1
+ props.setProperty(KafkaConfig.QuorumVotersProp,
s"${voterId}@localhost:9093")
}
-
- new KafkaConfig(props)
+ } else if (processRoles.contains(ControllerRole)) { // controller-only
+ props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+ props.setProperty(KafkaConfig.QuorumVotersProp,
s"${nodeId}@localhost:9093")
}
- val logDir = TestUtils.tempDirectory()
- val config = configWithProcessRolesAndNodeId(processRoles, nodeId, logDir)
+ new KafkaConfig(props)
+ }
+
+ private def createRaftManager(
+ topicPartition: TopicPartition,
+ config: KafkaConfig
+ ): KafkaRaftManager[Array[Byte]] = {
val topicId = new Uuid(0L, 2L)
val metaProperties = MetaProperties(
clusterId = Uuid.randomUuid.toString,
@@ -81,25 +100,99 @@ class RaftManagerTest {
)
}
- @Test
- def testNodeIdPresentIfBrokerRoleOnly(): Unit = {
- val raftManager = instantiateRaftManagerWithConfigs(new
TopicPartition("__raft_id_test", 0), "broker", "1")
- assertEquals(1, raftManager.client.nodeId.getAsInt)
+ @ParameterizedTest
+ @ValueSource(strings = Array("broker", "controller", "broker,controller"))
+ def testNodeIdPresent(processRoles: String): Unit = {
+ var processRolesSet = Set.empty[ProcessRole]
+ if (processRoles.contains("broker")) {
+ processRolesSet = processRolesSet ++ Set(BrokerRole)
+ }
+ if (processRoles.contains("controller")) {
+ processRolesSet = processRolesSet ++ Set(ControllerRole)
+ }
+
+ val logDir = TestUtils.tempDir()
+ val nodeId = 1
+ val raftManager = createRaftManager(
+ new TopicPartition("__raft_id_test", 0),
+ createConfig(
+ processRolesSet,
+ nodeId,
+ Some(logDir.toPath),
+ None
+ )
+ )
+ assertEquals(nodeId, raftManager.client.nodeId.getAsInt)
raftManager.shutdown()
}
- @Test
- def testNodeIdPresentIfControllerRoleOnly(): Unit = {
- val raftManager = instantiateRaftManagerWithConfigs(new
TopicPartition("__raft_id_test", 0), "controller", "1")
- assertEquals(1, raftManager.client.nodeId.getAsInt)
+ @ParameterizedTest
+ @ValueSource(strings = Array("metadata-only", "log-only", "both"))
+ def testLogDirLockWhenControllerOnly(dirType: String): Unit = {
+ val logDir = if (dirType.equals("metadata-only")) {
+ None
+ } else {
+ Some(TestUtils.tempDir().toPath)
+ }
+
+ val metadataDir = if (dirType.equals("log-only")) {
+ None
+ } else {
+ Some(TestUtils.tempDir().toPath)
+ }
+
+ val nodeId = 1
+ val raftManager = createRaftManager(
+ new TopicPartition("__raft_id_test", 0),
+ createConfig(
+ Set(ControllerRole),
+ nodeId,
+ logDir,
+ metadataDir
+ )
+ )
+
+ val lockPath =
metadataDir.getOrElse(logDir.get).resolve(LogManager.LockFileName)
+ assertTrue(fileLocked(lockPath))
+
raftManager.shutdown()
+
+ assertFalse(fileLocked(lockPath))
}
@Test
- def testNodeIdPresentIfColocated(): Unit = {
- val raftManager = instantiateRaftManagerWithConfigs(new
TopicPartition("__raft_id_test", 0), "controller,broker", "1")
- assertEquals(1, raftManager.client.nodeId.getAsInt)
+ def testLogDirLockWhenBrokerOnlyWithSeparateMetadataDir(): Unit = {
+ val logDir = Some(TestUtils.tempDir().toPath)
+ val metadataDir = Some(TestUtils.tempDir().toPath)
+
+ val nodeId = 1
+ val raftManager = createRaftManager(
+ new TopicPartition("__raft_id_test", 0),
+ createConfig(
+ Set(BrokerRole),
+ nodeId,
+ logDir,
+ metadataDir
+ )
+ )
+
+ val lockPath =
metadataDir.getOrElse(logDir.get).resolve(LogManager.LockFileName)
+ assertTrue(fileLocked(lockPath))
+
raftManager.shutdown()
+
+ assertFalse(fileLocked(lockPath))
+ }
+
+ private def fileLocked(path: Path): Boolean = {
+ TestUtils.resource(FileChannel.open(path, StandardOpenOption.WRITE)) {
channel =>
+ try {
+ Option(channel.tryLock()).foreach(_.close())
+ false
+ } catch {
+ case _: OverlappingFileLockException => true
+ }
+ }
}
@Test
@@ -140,5 +233,4 @@ class RaftManagerTest {
assertTrue(ioThread.isThreadFailed)
assertFalse(ioThread.isRunning)
}
-
}