This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 28430c31f9e MINOR: Replace ClientPropertiesBuilder with admin() 
factory methods in KafkaClusterTestKit (#21146)
28430c31f9e is described below

commit 28430c31f9ea54eb7e232666d3893355ec7ff059
Author: majialong <[email protected]>
AuthorDate: Fri Dec 19 15:40:51 2025 +0800

    MINOR: Replace ClientPropertiesBuilder with admin() factory methods in 
KafkaClusterTestKit (#21146)
    
    This PR simplifies `KafkaClusterTestKit` by removing
    `ClientPropertiesBuilder` and adding direct `admin()` factory methods
    that align with the `ClusterInstance` style.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/server/KRaftClusterTest.scala            | 42 +++++--------
 .../metadata/BrokerMetadataPublisherTest.scala     |  6 +-
 .../org/apache/kafka/server/KRaftClusterTest.java  | 28 +++------
 .../ReconfigurableQuorumIntegrationTest.java       | 16 ++---
 .../kafka/common/test/KafkaClusterTestKit.java     | 70 ++++++++++++----------
 .../tools/other/ReplicationQuotasTestRig.java      |  2 +-
 6 files changed, 74 insertions(+), 90 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 0b18c68aa16..40d00eb73b4 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -49,7 +49,7 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.{FileSystems, Files, Path, Paths}
 import java.util
 import java.util.concurrent.{ExecutionException, TimeUnit}
-import java.util.{Optional, OptionalLong, Properties}
+import java.util.{Optional, OptionalLong}
 import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
 import scala.util.Using
@@ -97,7 +97,7 @@ class KRaftClusterTest {
       cluster.format()
       cluster.startup()
       cluster.waitForReadyBrokers()
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         // Create the topic.
         val assignments = new util.HashMap[Integer, util.List[Integer]]
@@ -265,7 +265,7 @@ class KRaftClusterTest {
       cluster.format()
       cluster.startup()
       cluster.waitForReadyBrokers()
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         assertEquals(Seq(ApiError.NONE), incrementalAlter(admin, Seq(
           (new ConfigResource(Type.BROKER, ""), Seq(
@@ -322,7 +322,7 @@ class KRaftClusterTest {
       cluster.format()
       cluster.startup()
       cluster.waitForReadyBrokers()
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         Seq(log, log2).foreach(_.debug("setting log4j"))
 
@@ -378,7 +378,7 @@ class KRaftClusterTest {
       cluster.format()
       cluster.startup()
       cluster.waitForReadyBrokers()
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         val createResults = admin.createTopics(util.List.of(
           new NewTopic("foo", 1, 3.toShort),
@@ -400,13 +400,7 @@ class KRaftClusterTest {
   }
 
   def createAdminClient(cluster: KafkaClusterTestKit, bootstrapController: 
Boolean): Admin = {
-    var props: Properties = null
-    props = if (bootstrapController)
-      
cluster.newClientPropertiesBuilder().setUsingBootstrapControllers(true).build()
-    else
-      cluster.clientProperties()
-    props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
-    Admin.create(props)
+    cluster.admin(util.Map.of(AdminClientConfig.CLIENT_ID_CONFIG, 
this.getClass.getName), bootstrapController)
   }
 
   @Test
@@ -523,7 +517,7 @@ class KRaftClusterTest {
       cluster.format()
       cluster.startup()
       cluster.waitForReadyBrokers()
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         admin.updateFeatures(
           util.Map.of(MetadataVersion.FEATURE_NAME,
@@ -552,9 +546,7 @@ class KRaftClusterTest {
       cluster.format()
       cluster.startup()
       cluster.waitForReadyBrokers()
-      val admin = Admin.create(cluster.newClientPropertiesBuilder().
-        setUsingBootstrapControllers(usingBootstrapControlers).
-        build())
+      val admin = cluster.admin(util.Map.of(), usingBootstrapControlers)
       try {
         val featureMetadata = admin.describeFeatures().featureMetadata().get()
         assertEquals(new SupportedVersionRange(0, 1),
@@ -590,7 +582,7 @@ class KRaftClusterTest {
       TestUtils.waitUntilTrue(() => 
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
         "RaftManager was not initialized.")
 
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         // Create a test topic
         val newTopic = util.List.of(new NewTopic("test-topic", 1, 1.toShort))
@@ -676,7 +668,7 @@ class KRaftClusterTest {
     try {
       cluster.format()
       cluster.startup()
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         val newTopics = new util.ArrayList[NewTopic]()
         for (i <- 0 to 10000) {
@@ -758,9 +750,7 @@ class KRaftClusterTest {
     try {
       cluster.format()
       cluster.startup()
-      val admin = Admin.create(cluster.newClientPropertiesBuilder().
-        setUsingBootstrapControllers(true).
-        build())
+      val admin = cluster.admin(util.Map.of(), true)
       try {
         val exception = assertThrows(classOf[ExecutionException],
           () => admin.describeCluster().clusterId().get(1, TimeUnit.MINUTES))
@@ -814,7 +804,7 @@ class KRaftClusterTest {
     try {
       cluster.format()
       cluster.startup()
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         val broker0 = cluster.brokers().get(0)
         val broker1 = cluster.brokers().get(1)
@@ -869,7 +859,7 @@ class KRaftClusterTest {
     try {
       cluster.format()
       cluster.startup()
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         val broker0 = cluster.brokers().get(0)
         val broker1 = cluster.brokers().get(1)
@@ -934,7 +924,7 @@ class KRaftClusterTest {
     try {
       cluster.format()
       cluster.startup()
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         val broker0 = cluster.brokers().get(0)
         val broker1 = cluster.brokers().get(1)
@@ -1007,7 +997,7 @@ class KRaftClusterTest {
       TestUtils.waitUntilTrue(() => 
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
         "RaftManager was not initialized.")
 
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         // Create a test topic
         admin.createTopics(util.List.of(
@@ -1087,7 +1077,7 @@ class KRaftClusterTest {
       cluster.format()
       cluster.startup()
       cluster.waitForReadyBrokers()
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         admin.incrementalAlterConfigs(
           util.Map.of(new ConfigResource(Type.BROKER, ""),
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 90321acdced..dc3b987488f 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -27,7 +27,7 @@ import kafka.server.share.SharePartitionManager
 import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, 
NewTopic}
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry, NewTopic}
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.BROKER
@@ -124,7 +124,7 @@ class BrokerMetadataPublisherTest {
           override def answer(invocation: InvocationOnMock): Unit = 
numTimesReloadCalled.addAndGet(1)
         })
       broker.brokerMetadataPublisher.dynamicConfigPublisher = publisher
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         assertEquals(0, numTimesReloadCalled.get())
         admin.incrementalAlterConfigs(singletonMap(
@@ -170,7 +170,7 @@ class BrokerMetadataPublisherTest {
       
broker.sharedServer.loader.removeAndClosePublisher(broker.brokerMetadataPublisher).get(1,
 TimeUnit.MINUTES)
       broker.metadataPublishers.remove(broker.brokerMetadataPublisher)
       
broker.sharedServer.loader.installPublishers(List(publisher).asJava).get(1, 
TimeUnit.MINUTES)
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = cluster.admin()
       try {
         admin.createTopics(singletonList(new NewTopic("foo", 1, 
1.toShort))).all().get()
       } finally {
diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java 
b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
index 3286d0f185c..475fb11cd87 100644
--- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
+++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.server;
 
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.AlterClientQuotasResult;
@@ -74,7 +73,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -142,7 +140,7 @@ public class KRaftClusterTest {
                 "Broker never made it to RUNNING state.");
             TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
                 "RaftManager was not initialized.");
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (Admin admin = cluster.admin()) {
                 assertEquals(cluster.nodes().clusterId(),
                     admin.describeCluster().clusterId().get());
             }
@@ -163,7 +161,7 @@ public class KRaftClusterTest {
                 "Broker never made it to RUNNING state.");
             TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
                 "RaftManager was not initialized.");
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (Admin admin = cluster.admin()) {
                 assertEquals(cluster.nodes().clusterId(),
                     admin.describeCluster().clusterId().get());
             }
@@ -224,7 +222,7 @@ public class KRaftClusterTest {
             cluster.waitForReadyBrokers();
             assertConfigValue(cluster, 0);
 
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (Admin admin = cluster.admin()) {
                 admin.incrementalAlterConfigs(
                     Map.of(new ConfigResource(Type.BROKER, ""),
                         List.of(new AlterConfigOp(
@@ -264,7 +262,7 @@ public class KRaftClusterTest {
 
             assertFoobarValue(cluster, 0);
 
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (Admin admin = cluster.admin()) {
                 admin.incrementalAlterConfigs(
                     Map.of(new ConfigResource(Type.BROKER, ""),
                         List.of(new AlterConfigOp(
@@ -304,7 +302,7 @@ public class KRaftClusterTest {
                 "RaftManager was not initialized.");
 
             String testTopic = "test-topic";
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (Admin admin = cluster.admin()) {
                 // Create a test topic
                 List<NewTopic> newTopic = List.of(new NewTopic(testTopic, 1, 
(short) 3));
                 CreateTopicsResult createTopicResult = 
admin.createTopics(newTopic);
@@ -336,7 +334,7 @@ public class KRaftClusterTest {
             TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
                 "RaftManager was not initialized.");
 
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (Admin admin = cluster.admin()) {
                 // Create many topics
                 List<NewTopic> newTopics = List.of(
                     new NewTopic("test-topic-1", 2, (short) 3),
@@ -382,7 +380,7 @@ public class KRaftClusterTest {
             TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
                 "Broker never made it to RUNNING state.");
 
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (Admin admin = cluster.admin()) {
                 ClientQuotaEntity entity = new 
ClientQuotaEntity(Map.of("user", "testkit"));
                 ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
                     List.of(ClientQuotaFilterComponent.ofEntity("user", 
"testkit")));
@@ -470,7 +468,7 @@ public class KRaftClusterTest {
             TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
                 "Broker never made it to RUNNING state.");
 
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (Admin admin = cluster.admin()) {
                 ClientQuotaEntity defaultUser = new 
ClientQuotaEntity(Collections.singletonMap("user", null));
                 ClientQuotaEntity bobUser = new 
ClientQuotaEntity(Map.of("user", "bob"));
 
@@ -643,15 +641,7 @@ public class KRaftClusterTest {
     }
 
     private Admin createAdminClient(KafkaClusterTestKit cluster, boolean 
usingBootstrapControllers) {
-        Properties props = new Properties();
-        if (usingBootstrapControllers) {
-            props.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, 
cluster.bootstrapControllers());
-            props.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
-        } else {
-            props = cluster.clientProperties();
-        }
-        props.put(AdminClientConfig.CLIENT_ID_CONFIG, 
this.getClass().getName());
-        return Admin.create(props);
+        return cluster.admin(Map.of(AdminClientConfig.CLIENT_ID_CONFIG, 
this.getClass().getName()), usingBootstrapControllers);
     }
 
     public static class BadAuthorizer implements Authorizer {
diff --git 
a/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
 
b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
index ad30708d7c5..c3a14875cfa 100644
--- 
a/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -76,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
         ).build()) {
             cluster.format();
             cluster.startup();
-            try (var admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = cluster.admin()) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
                     checkKRaftVersions(admin, 
KRaftVersion.KRAFT_VERSION_0.featureLevel());
                 });
@@ -94,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
         ).setStandalone(true).build()) {
             cluster.format();
             cluster.startup();
-            try (var admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = cluster.admin()) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
                     checkKRaftVersions(admin, 
KRaftVersion.KRAFT_VERSION_1.featureLevel());
                 });
@@ -132,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
         ) {
             cluster.format();
             cluster.startup();
-            try (var admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = cluster.admin()) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
                     Map<Integer, Uuid> voters = findVoterDirs(admin);
                     assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -167,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
         ) {
             cluster.format();
             cluster.startup();
-            try (var admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = cluster.admin()) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
                     Map<Integer, Uuid> voters = findVoterDirs(admin);
                     assertEquals(Set.of(3000, 3001, 3002, 3003), 
voters.keySet());
@@ -206,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
         ) {
             cluster.format();
             cluster.startup();
-            try (var admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = cluster.admin()) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
                     Map<Integer, Uuid> voters = findVoterDirs(admin);
                     assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -244,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
         ) {
             cluster.format();
             cluster.startup();
-            try (var admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = cluster.admin()) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
                     Map<Integer, Uuid> voters = findVoterDirs(admin);
                     assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -275,7 +275,7 @@ public class ReconfigurableQuorumIntegrationTest {
         try (var cluster = new 
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
             cluster.format();
             cluster.startup();
-            try (var admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = cluster.admin()) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
                     Map<Integer, Uuid> voters = findVoterDirs(admin);
                     assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -327,7 +327,7 @@ public class ReconfigurableQuorumIntegrationTest {
         try (var cluster = new 
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
             cluster.format();
             cluster.startup();
-            try (var admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = cluster.admin()) {
                 Uuid dirId = 
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
                 var removeFuture = admin.removeRaftVoter(
                     3000,
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 471f4472e8d..4cf4e0770d0 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -26,8 +26,10 @@ import kafka.server.KafkaRaftServer;
 import kafka.server.SharedServer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.ListenerName;
@@ -66,7 +68,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -560,45 +561,48 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
             "Failed to wait for publisher to publish the metadata update to 
each broker.");
     }
 
-    public class ClientPropertiesBuilder {
-        private final Properties properties;
-        private boolean usingBootstrapControllers = false;
-
-        public ClientPropertiesBuilder() {
-            this.properties = new Properties();
-        }
-
-        public ClientPropertiesBuilder(Properties properties) {
-            this.properties = properties;
-        }
-
-        public ClientPropertiesBuilder setUsingBootstrapControllers(boolean 
usingBootstrapControllers) {
-            this.usingBootstrapControllers = usingBootstrapControllers;
-            return this;
-        }
+    public Admin admin() {
+        return admin(Map.of(), false);
+    }
 
-        public Properties build() {
-            if (usingBootstrapControllers) {
-                
properties.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, 
bootstrapControllers());
-                
properties.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
-            } else {
-                
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers());
-                
properties.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
-            }
-            return properties;
-        }
+    public Admin admin(Map<String, Object> configs) {
+        return admin(configs, false);
     }
 
-    public ClientPropertiesBuilder newClientPropertiesBuilder(Properties 
properties) {
-        return new ClientPropertiesBuilder(properties);
+    public Admin admin(Map<String, Object> configs, boolean 
usingBootstrapControllers) {
+        Map<String, Object> props = new HashMap<>(configs);
+        setBootstrapConfig(props, usingBootstrapControllers);
+        setClientSaslConfig(props, usingBootstrapControllers);
+        return Admin.create(props);
     }
 
-    public ClientPropertiesBuilder newClientPropertiesBuilder() {
-        return new ClientPropertiesBuilder();
+    private void setBootstrapConfig(Map<String, Object> props, boolean 
usingBootstrapControllers) {
+        if (usingBootstrapControllers) {
+            props.putIfAbsent(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, 
bootstrapControllers());
+            props.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+        } else {
+            props.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers());
+            props.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
+        }
     }
 
-    public Properties clientProperties() {
-        return new ClientPropertiesBuilder().build();
+    private void setClientSaslConfig(Map<String, Object> props, boolean 
usingBootstrapControllers) {
+        SecurityProtocol protocol = usingBootstrapControllers ?
+            nodes.controllerListenerProtocol() : 
nodes.brokerListenerProtocol();
+
+        props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol.name);
+
+        if (protocol == SecurityProtocol.SASL_PLAINTEXT) {
+            props.putIfAbsent(SaslConfigs.SASL_MECHANISM, "PLAIN");
+            props.putIfAbsent(
+                SaslConfigs.SASL_JAAS_CONFIG,
+                String.format(
+                    "org.apache.kafka.common.security.plain.PlainLoginModule 
required username=\"%s\" password=\"%s\";",
+                    JaasUtils.KAFKA_PLAIN_ADMIN,
+                    JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD
+                )
+            );
+        }
     }
 
     public String bootstrapServers() {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
 
b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
index c2294be8ed5..8aa6dd8bc46 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
@@ -182,7 +182,7 @@ public class ReplicationQuotasTestRig {
                 throw new RuntimeException("Failed to start test Kafka 
cluster", e);
             }
 
-            adminClient = Admin.create(cluster.clientProperties());
+            adminClient = cluster.admin();
         }
 
         public void tearDown() {

Reply via email to