This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 5d3af5c  KAFKA-13672: Race condition in DynamicBrokerConfig (#11920)
5d3af5c is described below

commit 5d3af5cc5f1cf4112b91ca9754d3de7d6ad40630
Author: Liam Clarke-Hutchinson <[email protected]>
AuthorDate: Thu Mar 24 16:54:05 2022 +1300

    KAFKA-13672: Race condition in DynamicBrokerConfig (#11920)
    
    Reviewers: David Jacot <[email protected]>, Luke Chen <[email protected]>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 22 +++++++++++++---------
 .../server/DynamicBrokerReconfigurationTest.scala  |  1 -
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index cb6cd84..2a4fd95 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import java.util
 import java.util.{Collections, Properties}
+import java.util.concurrent.CopyOnWriteArrayList
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.cluster.EndPoint
 import kafka.log.{LogCleaner, LogConfig, LogManager}
@@ -201,8 +202,11 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private[server] val staticDefaultConfigs = 
ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
   private val dynamicBrokerConfigs = mutable.Map[String, String]()
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
-  private val reconfigurables = mutable.Buffer[Reconfigurable]()
-  private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
+
+  // Use COWArrayList to prevent concurrent modification exception when an 
item is added by one thread to these
+  // collections, while another thread is iterating over them.
+  private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
+  private val brokerReconfigurables = new 
CopyOnWriteArrayList[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
   private var currentConfig: KafkaConfig = null
   private val dynamicConfigPasswordEncoder = 
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
@@ -259,16 +263,16 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
     verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
-    reconfigurables += reconfigurable
+    reconfigurables.add(reconfigurable)
   }
 
   def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
     verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
-    brokerReconfigurables += reconfigurable
+    brokerReconfigurables.add(reconfigurable)
   }
 
   def removeReconfigurable(reconfigurable: Reconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
-    reconfigurables -= reconfigurable
+    reconfigurables.remove(reconfigurable)
   }
 
   private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = 
CoreUtils.inWriteLock(lock) {
@@ -320,7 +324,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
    * changes are processed. At the moment, only listener configs are 
considered for reloading.
    */
   private[server] def reloadUpdatedFilesWithoutConfigChange(newProps: 
Properties): Unit = CoreUtils.inWriteLock(lock) {
-    reconfigurables
+    reconfigurables.asScala
       .filter(reconfigurable => 
ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
       .foreach {
         case reconfigurable: ListenerReconfigurable =>
@@ -535,8 +539,8 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
       try {
         val customConfigs = new util.HashMap[String, 
Object](newConfig.originalsFromThisConfig) // non-Kafka configs
-        newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
-        reconfigurables.foreach {
+        newConfig.valuesFromThisConfig.keySet.forEach(k => 
customConfigs.remove(k))
+        reconfigurables.forEach {
           case listenerReconfigurable: ListenerReconfigurable =>
             processListenerReconfigurable(listenerReconfigurable, newConfig, 
customConfigs, validateOnly, reloadOnly = false)
           case reconfigurable =>
@@ -546,7 +550,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 
         // BrokerReconfigurable updates are processed after config is updated. 
Only do the validation here.
         val brokerReconfigurablesToUpdate = 
mutable.Buffer[BrokerReconfigurable]()
-        brokerReconfigurables.foreach { reconfigurable =>
+        brokerReconfigurables.forEach { reconfigurable =>
           if 
(needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, 
changeMap.keySet, deletedKeySet)) {
             reconfigurable.validateReconfiguration(newConfig)
             if (!validateOnly)
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 1d64106..dee4f01 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -728,7 +728,6 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
   }
 
   @Test
-  @Disabled // TODO: To be re-enabled once we can make it less flaky 
(KAFKA-13672)
   def testThreadPoolResize(): Unit = {
     val requestHandlerPrefix = "data-plane-kafka-request-handler-"
     val networkThreadPrefix = "data-plane-kafka-network-thread-"

Reply via email to