This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c38a345 MINOR: Fix brokerId passed to metrics reporters (#4497)
c38a345 is described below
commit c38a34559fd078be3ed30162619078f129802353
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed Jan 31 17:59:49 2018 -0800
MINOR: Fix brokerId passed to metrics reporters (#4497)
Remove caching of brokerId in DynamicBrokerConfig constructor and delay
initialization until brokerId is set in KafkaConfig.
Reviewers: Jason Gustafson <[email protected]>
---
.../main/scala/kafka/server/DynamicBrokerConfig.scala | 5 ++---
core/src/main/scala/kafka/server/KafkaServer.scala | 16 ++++++++--------
.../kafka/server/DynamicBrokerReconfigurationTest.scala | 3 +++
3 files changed, 13 insertions(+), 11 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 168654d..9c85f9b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -119,7 +119,6 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
private[server] val staticDefaultConfigs =
ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
private val dynamicBrokerConfigs = mutable.Map[String, String]()
private val dynamicDefaultConfigs = mutable.Map[String, String]()
- private val brokerId = kafkaConfig.brokerId
private val reconfigurables = mutable.Buffer[Reconfigurable]()
private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
@@ -128,7 +127,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
private[server] def initialize(zkClient: KafkaZkClient): Unit = {
val adminZkClient = new AdminZkClient(zkClient)
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker,
ConfigEntityName.Default))
- updateBrokerConfig(brokerId,
adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString))
+ updateBrokerConfig(kafkaConfig.brokerId,
adminZkClient.fetchEntityConfig(ConfigType.Broker,
kafkaConfig.brokerId.toString))
}
def addReconfigurables(kafkaServer: KafkaServer): Unit = {
@@ -136,7 +135,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
if (kafkaServer.logManager.cleaner != null)
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
- addReconfigurable(new DynamicMetricsReporters(brokerId, kafkaServer))
+ addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId,
kafkaServer))
}
def addReconfigurable(reconfigurable: Reconfigurable): Unit =
CoreUtils.inWriteLock(lock) {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 98e4877..747a0df 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -199,14 +199,6 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
/* setup zookeeper */
initZkClient(time)
- // initialize dynamic broker configs from ZooKeeper. Any updates made
after this will be
- // applied after DynamicConfigManager starts.
- config.dynamicConfig.initialize(zkClient)
-
- /* start scheduler */
- kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
- kafkaScheduler.startup()
-
/* Get or create cluster_id */
_clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")
@@ -217,6 +209,14 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
this.logIdent = logContext.logPrefix
+ // initialize dynamic broker configs from ZooKeeper. Any updates made
after this will be
+ // applied after DynamicConfigManager starts.
+ config.dynamicConfig.initialize(zkClient)
+
+ /* start scheduler */
+ kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
+ kafkaScheduler.startup()
+
/* create and configure metrics */
val reporters = new util.ArrayList[MetricsReporter]
reporters.add(new JmxReporter(jmxPrefix))
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 374aac2..49d9953 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -469,6 +469,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
assertFalse("No metrics found", reporter.kafkaMetrics.isEmpty)
reporter.verifyMetricValue("request-total", "socket-server-metrics")
}
+ assertEquals(servers.map(_.config.brokerId).toSet,
TestMetricsReporter.configuredBrokers.toSet)
val clientId = "test-client-1"
val (producerThread, consumerThread) = startProduceConsume(retries = 0,
clientId)
@@ -812,6 +813,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
object TestMetricsReporter {
val PollingIntervalProp = "polling.interval"
val testReporters = new ConcurrentLinkedQueue[TestMetricsReporter]()
+ val configuredBrokers = mutable.Set[Int]()
def waitForReporters(count: Int): List[TestMetricsReporter] = {
TestUtils.waitUntilTrue(() => testReporters.size == count, msg = "Metrics
reporters not created")
@@ -839,6 +841,7 @@ class TestMetricsReporter extends MetricsReporter with
Reconfigurable with Close
}
override def configure(configs: util.Map[String, _]): Unit = {
+ configuredBrokers += configs.get(KafkaConfig.BrokerIdProp).toString.toInt
configureCount += 1
pollingInterval = configs.get(PollingIntervalProp).toString.toInt
}
--
To stop receiving notification emails like this one, please contact
[email protected].