This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 70d3b19 KAFKA-6243: Enable dynamic updates of broker metrics
reporters (#4464)
70d3b19 is described below
commit 70d3b19b1126ae64c214615adef590275f218ee7
Author: Rajini Sivaram <[email protected]>
AuthorDate: Tue Jan 30 12:55:32 2018 -0800
KAFKA-6243: Enable dynamic updates of broker metrics reporters (#4464)
Dynamic metrics reporter updates described in KIP-226. This includes:
- Addition and removal of metrics reporters
- Reconfiguration of custom metrics reporter configs
- Tests for metrics reporter updates at default cluster-level and as
per-broker config for testing
Reviewers: Jason Gustafson <[email protected]>
---
.../apache/kafka/common/config/AbstractConfig.java | 21 ++-
.../org/apache/kafka/common/metrics/Metrics.java | 9 +
.../java/org/apache/kafka/common/utils/Utils.java | 15 +-
core/src/main/scala/kafka/log/LogCleaner.scala | 13 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 103 +++++++++--
core/src/main/scala/kafka/server/KafkaServer.scala | 29 ++-
.../server/DynamicBrokerReconfigurationTest.scala | 203 +++++++++++++++++++--
7 files changed, 343 insertions(+), 50 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 9e32074..427c492 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -288,11 +288,26 @@ public class AbstractConfig {
* @return The list of configured instances
*/
public <T> List<T> getConfiguredInstances(String key, Class<T> t,
Map<String, Object> configOverrides) {
- List<String> klasses = getList(key);
- List<T> objects = new ArrayList<>();
+ return getConfiguredInstances(getList(key), t, configOverrides);
+ }
+
+
+ /**
+ * Get a list of configured instances of the given class specified by the
given configuration key. The configuration
+ * may specify either null or an empty string to indicate no configured
instances. In both cases, this method
+ * returns an empty list to indicate no configured instances.
+ * @param classNames The list of class names of the instances to create
+ * @param t The interface the class should implement
+ * @param configOverrides Configuration overrides to use.
+ * @return The list of configured instances
+ */
+ public <T> List<T> getConfiguredInstances(List<String> classNames,
Class<T> t, Map<String, Object> configOverrides) {
+ List<T> objects = new ArrayList<T>();
+ if (classNames == null)
+ return objects;
Map<String, Object> configPairs = originals();
configPairs.putAll(configOverrides);
- for (Object klass : klasses) {
+ for (Object klass : classNames) {
Object o;
if (klass instanceof String) {
try {
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 68e1f47..7f9fb9d 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -526,6 +526,15 @@ public class Metrics implements Closeable {
this.reporters.add(reporter);
}
+ /**
+ * Remove a MetricReporter
+ */
+ public synchronized void removeReporter(MetricsReporter reporter) {
+ if (this.reporters.remove(reporter)) {
+ reporter.close();
+ }
+ }
+
synchronized void registerMetric(KafkaMetric metric) {
MetricName metricName = metric.metricName();
if (this.metrics.containsKey(metricName))
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index dff6107..c7a654a 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -304,11 +304,22 @@ public final class Utils {
* Look up the class by name and instantiate it.
* @param klass class name
* @param base super class of the class to be instantiated
- * @param <T>
+ * @param <T> the type of the base class
* @return the new instance
*/
public static <T> T newInstance(String klass, Class<T> base) throws
ClassNotFoundException {
- return Utils.newInstance(Class.forName(klass, true,
Utils.getContextOrKafkaClassLoader()).asSubclass(base));
+ return Utils.newInstance(loadClass(klass, base));
+ }
+
+ /**
+ * Look up a class by name.
+ * @param klass class name
+ * @param base super class of the class for verification
+ * @param <T> the type of the base class
+ * @return the new class
+ */
+ public static <T> Class<? extends T> loadClass(String klass, Class<T>
base) throws ClassNotFoundException {
+ return Class.forName(klass, true,
Utils.getContextOrKafkaClassLoader()).asSubclass(base);
}
/**
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index e013cfb..063f443 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -32,6 +32,7 @@ import kafka.utils._
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
@@ -154,14 +155,22 @@ class LogCleaner(initialConfig: CleanerConfig,
cleaners.clear()
}
- override def reconfigurableConfigs(): Set[String] = {
+ override def reconfigurableConfigs: Set[String] = {
LogCleaner.ReconfigurableConfigs
}
- override def validateReconfiguration(newConfig: KafkaConfig): Boolean = {
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
val newCleanerConfig = LogCleaner.cleanerConfig(newConfig)
val numThreads = newCleanerConfig.numThreads
numThreads >= 1 && numThreads >= config.numThreads / 2 && numThreads <=
config.numThreads * 2
+ val currentThreads = config.numThreads
+ if (numThreads <= 0)
+ throw new ConfigException(s"Log cleaner threads should be at least 1")
+ if (numThreads < currentThreads / 2)
+ throw new ConfigException(s"Log cleaner threads cannot be reduced to
less than half the current value $currentThreads")
+ if (numThreads > currentThreads * 2)
+ throw new ConfigException(s"Log cleaner threads cannot be increased to
more than double the current value $currentThreads")
+
}
/**
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 58fa583..168654d 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -19,7 +19,7 @@ package kafka.server
import java.nio.charset.StandardCharsets
import java.util
-import java.util.Properties
+import java.util.{Collections, Properties}
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.log.{LogCleaner, LogConfig, LogManager}
@@ -29,7 +29,8 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.{ConfigDef, ConfigException, SslConfigs}
import org.apache.kafka.common.network.ListenerReconfigurable
-import org.apache.kafka.common.utils.Base64
+import org.apache.kafka.common.metrics.MetricsReporter
+import org.apache.kafka.common.utils.{Base64, Utils}
import scala.collection._
import scala.collection.JavaConverters._
@@ -81,6 +82,7 @@ object DynamicBrokerConfig {
AllDynamicConfigs ++= LogCleaner.ReconfigurableConfigs
AllDynamicConfigs ++= DynamicLogConfig.ReconfigurableConfigs
AllDynamicConfigs ++= DynamicThreadPool.ReconfigurableConfigs
+ AllDynamicConfigs ++= Set(KafkaConfig.MetricReporterClassesProp)
private val PerBrokerConfigs = DynamicSecurityConfigs
@@ -134,6 +136,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
if (kafkaServer.logManager.cleaner != null)
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
+ addReconfigurable(new DynamicMetricsReporters(brokerId, kafkaServer))
}
def addReconfigurable(reconfigurable: Reconfigurable): Unit =
CoreUtils.inWriteLock(lock) {
@@ -295,6 +298,11 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
}
}
+ private[server] def maybeReconfigure(reconfigurable: Reconfigurable,
oldConfig: KafkaConfig, newConfig: util.Map[String, _]): Unit = {
+ if (reconfigurable.reconfigurableConfigs.asScala.exists(key =>
oldConfig.originals.get(key) != newConfig.get(key)))
+ reconfigurable.reconfigure(newConfig)
+ }
+
private def updatedConfigs(newProps: java.util.Map[String, _], currentProps:
java.util.Map[_, _]): mutable.Map[String, _] = {
newProps.asScala.filter {
case (k, v) => v != currentProps.get(k)
@@ -388,10 +396,9 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
oldConfig: KafkaConfig,
newConfig: KafkaConfig,
validateOnly: Boolean): Unit = {
- if (validateOnly) {
- if (!reconfigurable.validateReconfiguration(newConfig))
- throw new ConfigException("Validation of dynamic config update failed")
- } else
+ if (validateOnly)
+ reconfigurable.validateReconfiguration(newConfig)
+ else
reconfigurable.reconfigure(oldConfig, newConfig)
}
}
@@ -400,7 +407,7 @@ trait BrokerReconfigurable {
def reconfigurableConfigs: Set[String]
- def validateReconfiguration(newConfig: KafkaConfig): Boolean
+ def validateReconfiguration(newConfig: KafkaConfig): Unit
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
}
@@ -463,12 +470,12 @@ object DynamicThreadPool {
class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable {
- override def reconfigurableConfigs(): Set[String] = {
+ override def reconfigurableConfigs: Set[String] = {
DynamicThreadPool.ReconfigurableConfigs
}
- override def validateReconfiguration(newConfig: KafkaConfig): Boolean = {
-
newConfig.values.asScala.filterKeys(DynamicThreadPool.ReconfigurableConfigs.contains).forall
{ case (k, v) =>
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+
newConfig.values.asScala.filterKeys(DynamicThreadPool.ReconfigurableConfigs.contains).foreach
{ case (k, v) =>
val newValue = v.asInstanceOf[Int]
val oldValue = currentValue(k)
if (newValue != oldValue) {
@@ -480,7 +487,6 @@ class DynamicThreadPool(server: KafkaServer) extends
BrokerReconfigurable {
if (newValue > oldValue * 2)
throw new ConfigException(s"$errorMsg, value should not be greater
than double the current value $oldValue")
}
- true
}
}
@@ -508,3 +514,78 @@ class DynamicThreadPool(server: KafkaServer) extends
BrokerReconfigurable {
}
}
}
+
+class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends
Reconfigurable {
+
+ private val dynamicConfig = server.config.dynamicConfig
+ private val metrics = server.metrics
+ private val propsOverride = Map[String, AnyRef](KafkaConfig.BrokerIdProp ->
brokerId.toString)
+ private val currentReporters = mutable.Map[String, MetricsReporter]()
+
+
createReporters(dynamicConfig.currentKafkaConfig.getList(KafkaConfig.MetricReporterClassesProp),
+ Collections.emptyMap[String, Object])
+
+ private[server] def currentMetricsReporters: List[MetricsReporter] =
currentReporters.values.toList
+
+ override def configure(configs: util.Map[String, _]): Unit = {}
+
+ override def reconfigurableConfigs(): util.Set[String] = {
+ val configs = new util.HashSet[String]()
+ configs.add(KafkaConfig.MetricReporterClassesProp)
+ currentReporters.values.foreach {
+ case reporter: Reconfigurable =>
configs.addAll(reporter.reconfigurableConfigs)
+ case _ =>
+ }
+ configs
+ }
+
+ override def validateReconfiguration(configs: util.Map[String, _]): Boolean
= {
+ val updatedMetricsReporters = metricsReporterClasses(configs)
+
+ // Ensure all the reporter classes can be loaded and have a default
constructor
+ updatedMetricsReporters.foreach { className =>
+ val clazz = Utils.loadClass(className, classOf[MetricsReporter])
+ clazz.getConstructor()
+ }
+
+ // Validate the new configuration using every reconfigurable reporter
instance that is not being deleted
+ currentReporters.values.forall {
+ case reporter: Reconfigurable =>
+ !updatedMetricsReporters.contains(reporter.getClass.getName) ||
reporter.validateReconfiguration(configs)
+ case _ => true
+ }
+ }
+
+ override def reconfigure(configs: util.Map[String, _]): Unit = {
+ val updatedMetricsReporters = metricsReporterClasses(configs)
+ val deleted = currentReporters.keySet -- updatedMetricsReporters
+ deleted.foreach(removeReporter)
+ currentReporters.values.foreach {
+ case reporter: Reconfigurable =>
dynamicConfig.maybeReconfigure(reporter, dynamicConfig.currentKafkaConfig,
configs)
+ case _ =>
+ }
+ val added = updatedMetricsReporters -- currentReporters.keySet
+ createReporters(added.asJava, configs)
+ }
+
+ private def createReporters(reporterClasses: util.List[String],
+ updatedConfigs: util.Map[String, _]): Unit = {
+ val props = new util.HashMap[String, AnyRef]
+ updatedConfigs.asScala.foreach { case (k, v) => props.put(k,
v.asInstanceOf[AnyRef]) }
+ propsOverride.foreach { case (k, v) => props.put(k, v) }
+ val reporters =
dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses,
classOf[MetricsReporter], props)
+ reporters.asScala.foreach { reporter =>
+ metrics.addReporter(reporter)
+ currentReporters += reporter.getClass.getName -> reporter
+ }
+ server.notifyClusterListeners(reporters.asScala)
+ }
+
+ private def removeReporter(className: String): Unit = {
+ currentReporters.remove(className).foreach(metrics.removeReporter)
+ }
+
+ private def metricsReporterClasses(configs: util.Map[String, _]):
mutable.Buffer[String] = {
+
configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
+ }
+}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index c4123f1..98e4877 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -186,10 +186,10 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
try {
info("starting")
- if(isShuttingDown.get)
+ if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down,
cannot re-start!")
- if(startupComplete.get)
+ if (startupComplete.get)
return
val canStartup = isStartingUp.compareAndSet(false, true)
@@ -218,8 +218,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
this.logIdent = logContext.logPrefix
/* create and configure metrics */
- val reporters =
config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp,
classOf[MetricsReporter],
- Map[String, AnyRef](KafkaConfig.BrokerIdProp ->
(config.brokerId.toString)).asJava)
+ val reporters = new util.ArrayList[MetricsReporter]
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
metrics = new Metrics(metricConfig, reporters, time, true)
@@ -228,7 +227,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
_brokerTopicStats = new BrokerTopicStats
quotaManagers = QuotaFactory.instantiate(config, metrics, time,
threadNamePrefix.getOrElse(""))
- notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
+ notifyClusterListeners(kafkaMetricsReporters ++
metrics.reporters.asScala)
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
@@ -321,7 +320,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
}
}
- private def notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit = {
+ private[server] def notifyClusterListeners(clusterListeners: Seq[AnyRef]):
Unit = {
val clusterResourceListeners = new ClusterResourceListeners
clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
@@ -387,7 +386,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
}
/**
- * Performs controlled shutdown
+ * Performs controlled shutdown
*/
private def controlledShutdown() {
@@ -637,7 +636,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
*
* @return A 2-tuple containing the brokerId and a sequence of offline log
directories.
*/
- private def getBrokerIdAndOfflineDirs: (Int, Seq[String]) = {
+ private def getBrokerIdAndOfflineDirs: (Int, Seq[String]) = {
var brokerId = config.brokerId
val brokerIdSet = mutable.HashSet[Int]()
val offlineDirs = mutable.ArrayBuffer.empty[String]
@@ -649,24 +648,24 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
brokerIdSet.add(brokerMetadata.brokerId)
}
} catch {
- case e : IOException =>
+ case e: IOException =>
offlineDirs += logDir
error(s"Fail to read $brokerMetaPropsFile under log directory
$logDir", e)
}
}
- if(brokerIdSet.size > 1)
+ if (brokerIdSet.size > 1)
throw new InconsistentBrokerIdException(
s"Failed to match broker.id across log.dirs. This could happen if
multiple brokers shared a log directory (log.dirs) " +
s"or partial data was manually copied from another broker. Found
$brokerIdSet")
- else if(brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last !=
brokerId)
+ else if (brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last !=
brokerId)
throw new InconsistentBrokerIdException(
s"Configured broker.id $brokerId doesn't match stored broker.id
${brokerIdSet.last} in meta.properties. " +
s"If you moved your data, make sure your configured broker.id matches.
" +
s"If you intend to create a new broker, you should remove all data in
your data directories (log.dirs).")
- else if(brokerIdSet.isEmpty && brokerId < 0 &&
config.brokerIdGenerationEnable) // generate a new brokerId from Zookeeper
+ else if (brokerIdSet.isEmpty && brokerId < 0 &&
config.brokerIdGenerationEnable) // generate a new brokerId from Zookeeper
brokerId = generateBrokerId
- else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
+ else if (brokerIdSet.size == 1) // pick broker.id from meta.properties
brokerId = brokerIdSet.last
@@ -678,11 +677,11 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
for (logDir <- config.logDirs if logManager.isLogDirOnline(new
File(logDir).getAbsolutePath)) {
val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
- if(brokerMetadataOpt.isEmpty)
+ if (brokerMetadataOpt.isEmpty)
logDirsWithoutMetaProps ++= List(logDir)
}
- for(logDir <- logDirsWithoutMetaProps) {
+ for (logDir <- logDirsWithoutMetaProps) {
val checkpoint = brokerMetadataCheckpoints(logDir)
checkpoint.write(BrokerMetadata(brokerId))
}
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 819d672..374aac2 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -18,11 +18,14 @@
package kafka.server
+import java.io.Closeable
import java.io.File
import java.nio.file.{Files, StandardCopyOption}
+import java.lang.management.ManagementFactory
import java.util
import java.util.{Collections, Properties}
-import java.util.concurrent.{ExecutionException, TimeUnit}
+import java.util.concurrent.{ConcurrentLinkedQueue, ExecutionException,
TimeUnit}
+import javax.management.ObjectName
import kafka.api.SaslSetup
import kafka.log.LogConfig
@@ -35,12 +38,13 @@ import
org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords,
KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{ClusterResource, ClusterResourceListener,
Reconfigurable, TopicPartition}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.SslConfigs._
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.{AuthenticationException,
InvalidRequestException}
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter}
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -48,6 +52,7 @@ import
org.apache.kafka.common.serialization.{StringDeserializer, StringSerializ
import org.junit.Assert._
import org.junit.{After, Before, Test}
+import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
@@ -93,6 +98,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
props.put(KafkaConfig.SaslEnabledMechanismsProp,
kafkaServerSaslMechanisms.mkString(","))
props.put(KafkaConfig.LogSegmentBytesProp, "2000")
+ props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000")
props ++= sslProperties1
addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal)
@@ -112,6 +118,8 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
TestUtils.createTopic(zkClient, topic, numPartitions = 10,
replicationFactor = numServers, servers)
createAdminClient(SecurityProtocol.SSL, SecureInternal)
+
+ TestMetricsReporter.testReporters.clear()
}
@After
@@ -351,15 +359,6 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.CREATE_TIME.toString))
consumerThread.waitForMatchingRecords(record => record.timestampType ==
TimestampType.CREATE_TIME)
-
- // Verify that even though broker defaults can be defined at default
cluster level for consistent
- // configuration across brokers, they can also be defined at per-broker
level for testing
- props.clear()
- props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "500000")
- alterConfigsOnServer(servers.head, props)
- assertEquals(500000,
servers.head.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
- servers.tail.foreach { server =>
assertEquals(Defaults.LogIndexSizeMaxBytes,
server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) }
-
// Verify that invalid configs are not applied
val invalidProps = Map(
KafkaConfig.LogMessageTimestampDifferenceMaxMsProp -> "abc", // Invalid
type
@@ -373,6 +372,14 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
reconfigureServers(props, perBrokerConfig = false, (k,
props.getProperty(k)), expectFailure = true)
}
+ // Verify that even though broker defaults can be defined at default
cluster level for consistent
+ // configuration across brokers, they can also be defined at per-broker
level for testing
+ props.clear()
+ props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "500000")
+ alterConfigsOnServer(servers.head, props)
+ assertEquals(500000,
servers.head.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
+ servers.tail.foreach { server =>
assertEquals(Defaults.LogIndexSizeMaxBytes,
server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) }
+
// Verify that produce/consume worked throughout this test without any
retries in producer
stopAndVerifyProduceConsume(producerThread, consumerThread,
mayFailRequests = false)
}
@@ -449,6 +456,81 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
"", mayFailRequests = false)
}
+ @Test
+ def testMetricsReporterUpdate(): Unit = {
+ // Add a new metrics reporter
+ val newProps = new Properties
+ newProps.put(TestMetricsReporter.PollingIntervalProp, "100")
+ configureMetricsReporters(Seq(classOf[TestMetricsReporter]), newProps)
+
+ val reporters = TestMetricsReporter.waitForReporters(servers.size)
+ reporters.foreach { reporter =>
+ reporter.verifyState(reconfigureCount = 0, deleteCount = 0,
pollingInterval = 100)
+ assertFalse("No metrics found", reporter.kafkaMetrics.isEmpty)
+ reporter.verifyMetricValue("request-total", "socket-server-metrics")
+ }
+
+ val clientId = "test-client-1"
+ val (producerThread, consumerThread) = startProduceConsume(retries = 0,
clientId)
+ TestUtils.waitUntilTrue(() => consumerThread.received >= 5, "Messages not
sent")
+
+ // Verify that JMX reporter is still active (test a metric registered
after the dynamic reporter update)
+ val mbeanServer = ManagementFactory.getPlatformMBeanServer
+ val byteRate = mbeanServer.getAttribute(new
ObjectName(s"kafka.server:type=Produce,client-id=$clientId"), "byte-rate")
+ assertTrue("JMX attribute not updated", byteRate.asInstanceOf[Double] > 0)
+
+ // Property not related to the metrics reporter config should not
reconfigure reporter
+ newProps.setProperty("some.prop", "some.value")
+ reconfigureServers(newProps, perBrokerConfig = false,
(TestMetricsReporter.PollingIntervalProp, "100"))
+ reporters.foreach(_.verifyState(reconfigureCount = 0, deleteCount = 0,
pollingInterval = 100))
+
+ // Update of custom config of metrics reporter should reconfigure reporter
+ newProps.put(TestMetricsReporter.PollingIntervalProp, "1000")
+ reconfigureServers(newProps, perBrokerConfig = false,
(TestMetricsReporter.PollingIntervalProp, "1000"))
+ reporters.foreach(_.verifyState(reconfigureCount = 1, deleteCount = 0,
pollingInterval = 1000))
+
+ // Verify removal of metrics reporter
+ configureMetricsReporters(Seq.empty[Class[_]], newProps)
+ reporters.foreach(_.verifyState(reconfigureCount = 1, deleteCount = 1,
pollingInterval = 1000))
+ TestMetricsReporter.testReporters.clear()
+
+ // Verify recreation of metrics reporter
+ newProps.put(TestMetricsReporter.PollingIntervalProp, "2000")
+ configureMetricsReporters(Seq(classOf[TestMetricsReporter]), newProps)
+ val newReporters = TestMetricsReporter.waitForReporters(servers.size)
+ newReporters.foreach(_.verifyState(reconfigureCount = 0, deleteCount = 0,
pollingInterval = 2000))
+
+ // Verify that validation failure of metrics reporter fails
reconfiguration and leaves config unchanged
+ newProps.put(KafkaConfig.MetricReporterClassesProp,
"unknownMetricsReporter")
+ reconfigureServers(newProps, perBrokerConfig = false,
(TestMetricsReporter.PollingIntervalProp, "2000"), expectFailure = true)
+ servers.foreach { server =>
+ assertEquals(classOf[TestMetricsReporter].getName,
server.config.originals.get(KafkaConfig.MetricReporterClassesProp))
+ }
+ newReporters.foreach(_.verifyState(reconfigureCount = 1, deleteCount = 0,
pollingInterval = 2000))
+
+ // Verify that validation failure of custom config fails reconfiguration
and leaves config unchanged
+ newProps.put(TestMetricsReporter.PollingIntervalProp, "invalid")
+ reconfigureServers(newProps, perBrokerConfig = false,
(TestMetricsReporter.PollingIntervalProp, "2000"), expectFailure = true)
+ newReporters.foreach(_.verifyState(reconfigureCount = 1, deleteCount = 0,
pollingInterval = 2000))
+
+ // Delete reporters
+ configureMetricsReporters(Seq.empty[Class[_]], newProps)
+ TestMetricsReporter.testReporters.clear()
+
+ // Verify that even though metrics reporters can be defined at default
cluster level for consistent
+ // configuration across brokers, they can also be defined at per-broker
level for testing
+ newProps.put(KafkaConfig.MetricReporterClassesProp,
classOf[TestMetricsReporter].getName)
+ newProps.put(TestMetricsReporter.PollingIntervalProp, "4000")
+ alterConfigsOnServer(servers.head, newProps)
+ TestUtils.waitUntilTrue(() => !TestMetricsReporter.testReporters.isEmpty,
"Metrics reporter not created")
+ val perBrokerReporter = TestMetricsReporter.waitForReporters(1).head
+ perBrokerReporter.verifyState(reconfigureCount = 1, deleteCount = 0,
pollingInterval = 4000)
+ servers.tail.foreach { server => assertEquals("",
server.config.originals.get(KafkaConfig.MetricReporterClassesProp)) }
+
+ // Verify that produce/consume worked throughout this test without any
retries in producer
+ stopAndVerifyProduceConsume(producerThread, consumerThread)
+ }
+
private def createProducer(trustStore: File, retries: Int,
clientId: String = "test-producer"):
KafkaProducer[String, String] = {
val bootstrapServers = TestUtils.bootstrapServers(servers, new
ListenerName(SecureExternal))
@@ -575,7 +657,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
val exception =
intercept[ExecutionException](alterResult.values.get(brokerResource).get)
assertTrue(exception.getCause.isInstanceOf[InvalidRequestException])
}
- assertEquals(oldProps,
servers.head.config.values.asScala.filterKeys(newProps.containsKey))
+ servers.foreach { server => assertEquals(oldProps,
server.config.values.asScala.filterKeys(newProps.containsKey)) }
} else {
alterResult.all.get
waitForConfig(aPropToVerify._1, aPropToVerify._2)
@@ -619,6 +701,13 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
}
}
+ private def configureMetricsReporters(reporters: Seq[Class[_]], props:
Properties,
+ perBrokerConfig: Boolean = false): Unit
= {
+ val reporterStr = reporters.map(_.getName).mkString(",")
+ props.put(KafkaConfig.MetricReporterClassesProp, reporterStr)
+ reconfigureServers(props, perBrokerConfig,
(KafkaConfig.MetricReporterClassesProp, reporterStr))
+ }
+
private def invalidSslConfigs: Properties = {
val props = new Properties
props.put(SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")
@@ -644,8 +733,8 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
assertTrue(s"Invalid threads: expected $expectedCount, got
${threads.size}: $threads", resized)
}
- private def startProduceConsume(retries: Int): (ProducerThread,
ConsumerThread) = {
- val producerThread = new ProducerThread(retries)
+ private def startProduceConsume(retries: Int, producerClientId: String =
"test-producer"): (ProducerThread, ConsumerThread) = {
+ val producerThread = new ProducerThread(producerClientId, retries)
clientThreads += producerThread
val consumerThread = new ConsumerThread(producerThread)
clientThreads += consumerThread
@@ -656,7 +745,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
}
private def stopAndVerifyProduceConsume(producerThread: ProducerThread,
consumerThread: ConsumerThread,
-
mayFailRequests: Boolean): Unit = {
+ mayFailRequests: Boolean = false):
Unit = {
TestUtils.waitUntilTrue(() => producerThread.sent >= 10, "Messages not
sent")
producerThread.shutdown()
consumerThread.initiateShutdown()
@@ -669,8 +758,8 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
}
}
- private class ProducerThread(retries: Int) extends
ShutdownableThread("test-producer", isInterruptible = false) {
- private val producer = createProducer(trustStoreFile1, retries)
+ private class ProducerThread(clientId: String, retries: Int) extends
ShutdownableThread(clientId, isInterruptible = false) {
+ private val producer = createProducer(trustStoreFile1, retries, clientId)
@volatile var sent = 0
override def doWork(): Unit = {
try {
@@ -719,3 +808,83 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
}
}
}
+
+object TestMetricsReporter {
+ val PollingIntervalProp = "polling.interval"
+ val testReporters = new ConcurrentLinkedQueue[TestMetricsReporter]()
+
+ def waitForReporters(count: Int): List[TestMetricsReporter] = {
+ TestUtils.waitUntilTrue(() => testReporters.size == count, msg = "Metrics
reporters not created")
+
+ val reporters = testReporters.asScala.toList
+ TestUtils.waitUntilTrue(() => reporters.forall(_.configureCount == 1), msg
= "Metrics reporters not configured")
+ reporters
+ }
+}
+
+class TestMetricsReporter extends MetricsReporter with Reconfigurable with
Closeable with ClusterResourceListener {
+ import TestMetricsReporter._
+ val kafkaMetrics = ArrayBuffer[KafkaMetric]()
+ @volatile var initializeCount = 0
+ @volatile var configureCount = 0
+ @volatile var reconfigureCount = 0
+ @volatile var closeCount = 0
+ @volatile var clusterUpdateCount = 0
+ @volatile var pollingInterval: Int = -1
+ testReporters.add(this)
+
+ override def init(metrics: util.List[KafkaMetric]): Unit = {
+ kafkaMetrics ++= metrics.asScala
+ initializeCount += 1
+ }
+
+ override def configure(configs: util.Map[String, _]): Unit = {
+ configureCount += 1
+ pollingInterval = configs.get(PollingIntervalProp).toString.toInt
+ }
+
+ override def metricChange(metric: KafkaMetric): Unit = {
+ }
+
+ override def metricRemoval(metric: KafkaMetric): Unit = {
+ kafkaMetrics -= metric
+ }
+
+ override def onUpdate(clusterResource: ClusterResource): Unit = {
+ assertNotNull("Cluster id not set", clusterResource.clusterId)
+ clusterUpdateCount += 1
+ }
+
+ override def reconfigurableConfigs(): util.Set[String] = {
+ Set(PollingIntervalProp).asJava
+ }
+
+ override def validateReconfiguration(configs: util.Map[String, _]): Boolean
= {
+ configs.get(PollingIntervalProp).toString.toInt > 0
+ }
+
+ override def reconfigure(configs: util.Map[String, _]): Unit = {
+ reconfigureCount += 1
+ pollingInterval = configs.get(PollingIntervalProp).toString.toInt
+ }
+
+ override def close(): Unit = {
+ closeCount += 1
+ }
+
+ def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval:
Int): Unit = {
+ assertEquals(1, initializeCount)
+ assertEquals(1, configureCount)
+ assertEquals(reconfigureCount, reconfigureCount)
+ assertEquals(deleteCount, closeCount)
+ assertEquals(1, clusterUpdateCount)
+ assertEquals(pollingInterval, this.pollingInterval)
+ }
+
+ def verifyMetricValue(name: String, group: String): Unit = {
+ val matchingMetrics = kafkaMetrics.filter(metric => metric.metricName.name
== name && metric.metricName.group == group)
+ assertTrue("Metric not found", matchingMetrics.nonEmpty)
+ val total = matchingMetrics.foldLeft(0.0)((total, metric) => total +
metric.metricValue.asInstanceOf[Double])
+ assertTrue("Invalid metric value", total > 0.0)
+ }
+}
--
To stop receiving notification emails like this one, please contact
[email protected].