This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bc3972c KAFKA-13219: BrokerState metric not working for KRaft
clusters (#11239)
bc3972c is described below
commit bc3972c9f3e8c6d92e2154501bf18ebcb1a93f64
Author: Ron Dagostino <[email protected]>
AuthorDate: Mon Aug 23 16:45:44 2021 -0400
KAFKA-13219: BrokerState metric not working for KRaft clusters (#11239)
The BrokerState metric always has a value of 0, for NOT_RUNNING, in KRaft
clusters. This patch fixes it and adds a test.
Reviewers: Ismael Juma <[email protected]>
---
.../scala/kafka/server/BrokerLifecycleManager.scala | 8 ++++----
core/src/main/scala/kafka/server/BrokerServer.scala | 8 ++++----
core/src/main/scala/kafka/server/KafkaBroker.scala | 4 +---
core/src/main/scala/kafka/server/KafkaServer.scala | 15 +++++++++------
.../test/junit/RaftClusterInvocationContext.java | 2 +-
.../integration/kafka/server/KRaftClusterTest.scala | 12 ++++++------
.../kafka/server/BrokerLifecycleManagerTest.scala | 20 ++++++++++----------
7 files changed, 35 insertions(+), 34 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index ae9634b..e15a3e6 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -166,12 +166,12 @@ class BrokerLifecycleManager(val config: KafkaConfig,
* The channel manager, or null if this manager has not been started yet.
This variable
* can only be read or written from the event queue thread.
*/
- var _channelManager: BrokerToControllerChannelManager = _
+ private var _channelManager: BrokerToControllerChannelManager = _
/**
* The event queue.
*/
- val eventQueue = new KafkaEventQueue(time, logContext,
threadNamePrefix.getOrElse(""))
+ private[server] val eventQueue = new KafkaEventQueue(time, logContext,
threadNamePrefix.getOrElse(""))
/**
* Start the BrokerLifecycleManager.
@@ -193,9 +193,9 @@ class BrokerLifecycleManager(val config: KafkaConfig,
eventQueue.append(new SetReadyToUnfenceEvent())
}
- def brokerEpoch(): Long = _brokerEpoch
+ def brokerEpoch: Long = _brokerEpoch
- def state(): BrokerState = _state
+ def state: BrokerState = _state
private class BeginControlledShutdownEvent extends EventQueue.Event {
override def run(): Unit = {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index d6e82e7..d2079c4 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -84,6 +84,8 @@ class BrokerServer(
val supportedFeatures: util.Map[String, VersionRange]
) extends KafkaBroker {
+ override def brokerState: BrokerState = lifecycleManager.state
+
import kafka.server.Server._
private val logContext: LogContext = new LogContext(s"[BrokerServer
id=${config.nodeId}] ")
@@ -248,7 +250,7 @@ class BrokerServer(
scheduler = kafkaScheduler,
time = time,
brokerId = config.nodeId,
- brokerEpochSupplier = () => lifecycleManager.brokerEpoch()
+ brokerEpochSupplier = () => lifecycleManager.brokerEpoch
)
alterIsrManager.start()
@@ -270,7 +272,7 @@ class BrokerServer(
val producerIdManagerSupplier = () => ProducerIdManager.rpc(
config.brokerId,
- brokerEpochSupplier = () => lifecycleManager.brokerEpoch(),
+ brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
clientToControllerChannelManager,
config.requestTimeoutMs
)
@@ -516,6 +518,4 @@ class BrokerServer(
def boundPort(listenerName: ListenerName): Int =
socketServer.boundPort(listenerName)
- def currentState(): BrokerState = lifecycleManager.state()
-
}
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala
b/core/src/main/scala/kafka/server/KafkaBroker.scala
index 9b9fc97..91d8a77 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -63,11 +63,9 @@ object KafkaBroker {
}
trait KafkaBroker extends KafkaMetricsGroup {
- @volatile private var _brokerState: BrokerState = BrokerState.NOT_RUNNING
def authorizer: Option[Authorizer]
- def brokerState: BrokerState = _brokerState
- protected def brokerState_= (brokerState: BrokerState): Unit = _brokerState
= brokerState
+ def brokerState: BrokerState
def clusterId: String
def config: KafkaConfig
def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0a565e1..b3c66a6 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -96,6 +96,7 @@ class KafkaServer(
private val isShuttingDown = new AtomicBoolean(false)
private val isStartingUp = new AtomicBoolean(false)
+ @volatile private var _brokerState: BrokerState = BrokerState.NOT_RUNNING
private var shutdownLatch = new CountDownLatch(1)
private var logContext: LogContext = null
@@ -161,6 +162,8 @@ class KafkaServer(
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
val featureCache: FinalizedFeatureCache = new
FinalizedFeatureCache(brokerFeatures)
+ override def brokerState: BrokerState = _brokerState
+
def clusterId: String = _clusterId
// Visible for testing
@@ -188,7 +191,7 @@ class KafkaServer(
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
- brokerState = BrokerState.STARTING
+ _brokerState = BrokerState.STARTING
/* setup zookeeper */
initZkClient(time)
@@ -250,7 +253,7 @@ class KafkaServer(
logManager = LogManager(config, initialOfflineDirs,
new ZkConfigRepository(new AdminZkClient(zkClient)),
kafkaScheduler, time, brokerTopicStats, logDirFailureChannel,
config.usesTopicId)
- brokerState = BrokerState.RECOVERY
+ _brokerState = BrokerState.RECOVERY
logManager.startup(zkClient.getAllTopicsInCluster())
metadataCache = MetadataCache.zkMetadataCache(config.brokerId)
@@ -418,7 +421,7 @@ class KafkaServer(
socketServer.startProcessingRequests(authorizerFutures)
- brokerState = BrokerState.RUNNING
+ _brokerState = BrokerState.RUNNING
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
@@ -631,7 +634,7 @@ class KafkaServer(
// the shutdown.
info("Starting controlled shutdown")
- brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN
+ _brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN
val shutdownSucceeded =
doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
@@ -656,7 +659,7 @@ class KafkaServer(
// `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false,
true)) {
CoreUtils.swallow(controlledShutdown(), this)
- brokerState = BrokerState.SHUTTING_DOWN
+ _brokerState = BrokerState.SHUTTING_DOWN
if (dynamicConfigManager != null)
CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
@@ -726,7 +729,7 @@ class KafkaServer(
// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()
- brokerState = BrokerState.NOT_RUNNING
+ _brokerState = BrokerState.NOT_RUNNING
startupComplete.set(false)
isShuttingDown.set(false)
diff --git
a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index fc6b557..c60e0ec 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -92,7 +92,7 @@ public class RaftClusterInvocationContext implements
TestTemplateInvocationConte
cluster.format();
cluster.startup();
kafka.utils.TestUtils.waitUntilTrue(
- () -> cluster.brokers().get(0).currentState() ==
BrokerState.RUNNING,
+ () -> cluster.brokers().get(0).brokerState() ==
BrokerState.RUNNING,
() -> "Broker never made it to RUNNING state.",
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
100L);
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 39afbe4..b7ca0f7 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -30,9 +30,9 @@ import
org.apache.kafka.common.requests.{DescribeClusterRequest, DescribeCluster
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Tag, Test, Timeout}
+
import java.util
import java.util.{Arrays, Collections, Optional}
-
import scala.collection.mutable
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
@@ -64,7 +64,7 @@ class KRaftClusterTest {
try {
cluster.format()
cluster.startup()
- TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() ==
BrokerState.RUNNING,
+ TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState ==
BrokerState.RUNNING,
"Broker never made it to RUNNING state.")
TestUtils.waitUntilTrue(() =>
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
"RaftManager was not initialized.")
@@ -90,7 +90,7 @@ class KRaftClusterTest {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
- TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() ==
BrokerState.RUNNING,
+ TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState ==
BrokerState.RUNNING,
"Broker never made it to RUNNING state.")
TestUtils.waitUntilTrue(() =>
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
"RaftManager was not initialized.")
@@ -127,7 +127,7 @@ class KRaftClusterTest {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
- TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() ==
BrokerState.RUNNING,
+ TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState ==
BrokerState.RUNNING,
"Broker never made it to RUNNING state.")
TestUtils.waitUntilTrue(() =>
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
"RaftManager was not initialized.")
@@ -160,7 +160,7 @@ class KRaftClusterTest {
try {
cluster.format()
cluster.startup()
- TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() ==
BrokerState.RUNNING,
+ TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState ==
BrokerState.RUNNING,
"Broker never made it to RUNNING state.")
val admin = Admin.create(cluster.clientProperties())
try {
@@ -317,7 +317,7 @@ class KRaftClusterTest {
private def waitForRunningBrokers(count: Int, waitTime: FiniteDuration)
(implicit cluster: KafkaClusterTestKit):
Seq[BrokerServer] = {
def getRunningBrokerServers: Seq[BrokerServer] =
cluster.brokers.values.asScala.toSeq
- .filter(brokerServer => brokerServer.currentState() ==
BrokerState.RUNNING)
+ .filter(brokerServer => brokerServer.brokerState == BrokerState.RUNNING)
val (runningBrokerServers, hasRunningBrokers) =
TestUtils.computeUntilTrue(getRunningBrokerServers,
waitTime.toMillis)(_.nonEmpty)
assertTrue(hasRunningBrokers,
diff --git
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index a551288..d97724d 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -104,15 +104,15 @@ class BrokerLifecycleManagerTest {
def testCreateStartAndClose(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time,
None)
- assertEquals(BrokerState.NOT_RUNNING, manager.state())
+ assertEquals(BrokerState.NOT_RUNNING, manager.state)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId,
context.advertisedListeners,
Collections.emptyMap())
TestUtils.retry(60000) {
- assertEquals(BrokerState.STARTING, manager.state())
+ assertEquals(BrokerState.STARTING, manager.state)
}
manager.close()
- assertEquals(BrokerState.SHUTTING_DOWN, manager.state())
+ assertEquals(BrokerState.SHUTTING_DOWN, manager.state)
}
@Test
@@ -128,7 +128,7 @@ class BrokerLifecycleManagerTest {
Collections.emptyMap())
TestUtils.retry(10000) {
context.poll()
- assertEquals(1000L, manager.brokerEpoch())
+ assertEquals(1000L, manager.brokerEpoch)
}
manager.close()
@@ -169,9 +169,9 @@ class BrokerLifecycleManagerTest {
TestUtils.retry(60000) {
context.poll()
manager.eventQueue.wakeup()
- assertEquals(BrokerState.SHUTTING_DOWN, manager.state())
+ assertEquals(BrokerState.SHUTTING_DOWN, manager.state)
assertTrue(manager.initialCatchUpFuture.isCompletedExceptionally())
- assertEquals(-1L, manager.brokerEpoch())
+ assertEquals(-1L, manager.brokerEpoch)
}
manager.close()
}
@@ -192,7 +192,7 @@ class BrokerLifecycleManagerTest {
TestUtils.retry(10000) {
context.poll()
manager.eventQueue.wakeup()
- assertEquals(BrokerState.RECOVERY, manager.state())
+ assertEquals(BrokerState.RECOVERY, manager.state)
}
context.mockClient.prepareResponseFrom(new BrokerHeartbeatResponse(
new BrokerHeartbeatResponseData().setIsFenced(false)), controllerNode)
@@ -200,13 +200,13 @@ class BrokerLifecycleManagerTest {
TestUtils.retry(10000) {
context.poll()
manager.eventQueue.wakeup()
- assertEquals(BrokerState.RUNNING, manager.state())
+ assertEquals(BrokerState.RUNNING, manager.state)
}
manager.beginControlledShutdown()
TestUtils.retry(10000) {
context.poll()
manager.eventQueue.wakeup()
- assertEquals(BrokerState.PENDING_CONTROLLED_SHUTDOWN, manager.state())
+ assertEquals(BrokerState.PENDING_CONTROLLED_SHUTDOWN, manager.state)
assertTrue(context.mockClient.hasInFlightRequests)
}
@@ -226,7 +226,7 @@ class BrokerLifecycleManagerTest {
TestUtils.retry(10000) {
context.poll()
manager.eventQueue.wakeup()
- assertEquals(BrokerState.SHUTTING_DOWN, manager.state())
+ assertEquals(BrokerState.SHUTTING_DOWN, manager.state)
}
manager.controlledShutdownFuture.get()
manager.close()