This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 85c7227bf2a MINOR: fix DynamicBrokerReconfigurationTest (#22226)
85c7227bf2a is described below

commit 85c7227bf2a64178b5217788b12f6cf3ff0cea8b
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed May 6 14:23:35 2026 -0700

    MINOR: fix DynamicBrokerReconfigurationTest (#22226)
    
    The test is subject to a race-condition. This PR change async to sync
    call to fix it.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/server/DynamicBrokerReconfigurationTest.scala   | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index a99561508ff..c3706c7bb0c 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1262,7 +1262,18 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
   }
 
   private def alterConfigs(servers: Seq[KafkaBroker], adminClient: Admin, 
props: Properties,
-                   perBrokerConfig: Boolean): AlterConfigsResult = {
+                           perBrokerConfig: Boolean): Unit = {
+    alterConfigsAsync(servers, adminClient, props, perBrokerConfig).all.get()
+    // Skip config-provider placeholders (e.g. "${file:...}"): the broker 
stores the resolved value,
+    // so waiting on the literal placeholder would never match. Callers using 
placeholders must wait
+    // on the resolved value themselves.
+    props.asScala.foreach { case (k, v) =>
+      if (!v.contains("${")) waitForConfig(k, v)
+    }
+  }
+
+  private def alterConfigsAsync(servers: Seq[KafkaBroker], adminClient: Admin, 
props: Properties,
+                                perBrokerConfig: Boolean): AlterConfigsResult 
= {
     val configEntries = props.asScala.map { case (k, v) => new 
AlterConfigOp(new ConfigEntry(k, v), OpType.SET) }.toList.asJava
     val configs = if (perBrokerConfig) {
       val alterConfigs = new java.util.HashMap[ConfigResource, 
java.util.Collection[AlterConfigOp]]()
@@ -1277,7 +1288,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
   }
 
   private def reconfigureServers(newProps: Properties, perBrokerConfig: 
Boolean, aPropToVerify: (String, String), expectFailure: Boolean = false): Unit 
= {
-    val alterResult = alterConfigs(servers, adminClients.head, newProps, 
perBrokerConfig)
+    val alterResult = alterConfigsAsync(servers, adminClients.head, newProps, 
perBrokerConfig)
     if (expectFailure) {
       val oldProps = servers.head.config.values.asScala.filter { case (k, _) 
=> newProps.containsKey(k) }
       val brokerResources = if (perBrokerConfig)

Reply via email to