This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 5d3af5c KAFKA-13672: Race condition in DynamicBrokerConfig (#11920)
5d3af5c is described below
commit 5d3af5cc5f1cf4112b91ca9754d3de7d6ad40630
Author: Liam Clarke-Hutchinson <[email protected]>
AuthorDate: Thu Mar 24 16:54:05 2022 +1300
KAFKA-13672: Race condition in DynamicBrokerConfig (#11920)
Reviewers: David Jacot <[email protected]>, Luke Chen <[email protected]>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 22 +++++++++++++---------
.../server/DynamicBrokerReconfigurationTest.scala | 1 -
2 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index cb6cd84..2a4fd95 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -19,6 +19,7 @@ package kafka.server
import java.util
import java.util.{Collections, Properties}
+import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogConfig, LogManager}
@@ -201,8 +202,11 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
private[server] val staticDefaultConfigs =
ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
private val dynamicBrokerConfigs = mutable.Map[String, String]()
private val dynamicDefaultConfigs = mutable.Map[String, String]()
- private val reconfigurables = mutable.Buffer[Reconfigurable]()
- private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
+
+ // Use COWArrayList to prevent concurrent modification exception when an
item is added by one thread to these
+ // collections, while another thread is iterating over them.
+ private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
+ private val brokerReconfigurables = new
CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig: KafkaConfig = null
private val dynamicConfigPasswordEncoder =
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
@@ -259,16 +263,16 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
def addReconfigurable(reconfigurable: Reconfigurable): Unit =
CoreUtils.inWriteLock(lock) {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
- reconfigurables += reconfigurable
+ reconfigurables.add(reconfigurable)
}
def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit =
CoreUtils.inWriteLock(lock) {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
- brokerReconfigurables += reconfigurable
+ brokerReconfigurables.add(reconfigurable)
}
def removeReconfigurable(reconfigurable: Reconfigurable): Unit =
CoreUtils.inWriteLock(lock) {
- reconfigurables -= reconfigurable
+ reconfigurables.remove(reconfigurable)
}
private def verifyReconfigurableConfigs(configNames: Set[String]): Unit =
CoreUtils.inWriteLock(lock) {
@@ -320,7 +324,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
* changes are processed. At the moment, only listener configs are
considered for reloading.
*/
private[server] def reloadUpdatedFilesWithoutConfigChange(newProps:
Properties): Unit = CoreUtils.inWriteLock(lock) {
- reconfigurables
+ reconfigurables.asScala
.filter(reconfigurable =>
ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
.foreach {
case reconfigurable: ListenerReconfigurable =>
@@ -535,8 +539,8 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
try {
val customConfigs = new util.HashMap[String,
Object](newConfig.originalsFromThisConfig) // non-Kafka configs
- newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
- reconfigurables.foreach {
+ newConfig.valuesFromThisConfig.keySet.forEach(k =>
customConfigs.remove(k))
+ reconfigurables.forEach {
case listenerReconfigurable: ListenerReconfigurable =>
processListenerReconfigurable(listenerReconfigurable, newConfig,
customConfigs, validateOnly, reloadOnly = false)
case reconfigurable =>
@@ -546,7 +550,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
// BrokerReconfigurable updates are processed after config is updated.
Only do the validation here.
val brokerReconfigurablesToUpdate =
mutable.Buffer[BrokerReconfigurable]()
- brokerReconfigurables.foreach { reconfigurable =>
+ brokerReconfigurables.forEach { reconfigurable =>
if
(needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava,
changeMap.keySet, deletedKeySet)) {
reconfigurable.validateReconfiguration(newConfig)
if (!validateOnly)
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 1d64106..dee4f01 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -728,7 +728,6 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
}
@Test
- @Disabled // TODO: To be re-enabled once we can make it less flaky
(KAFKA-13672)
def testThreadPoolResize(): Unit = {
val requestHandlerPrefix = "data-plane-kafka-request-handler-"
val networkThreadPrefix = "data-plane-kafka-network-thread-"