This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55260e9835a KAFKA-19042: Move AdminClientWithPoliciesIntegrationTest 
to clients-integration-tests module (#20339)
55260e9835a is described below

commit 55260e9835ae74c7eb1e1c54e90b7f00f6e278d2
Author: Jhen-Yung Hsu <jhenyung...@gmail.com>
AuthorDate: Fri Aug 15 17:44:47 2025 +0800

    KAFKA-19042: Move AdminClientWithPoliciesIntegrationTest to 
clients-integration-tests module (#20339)
    
    This PR does the following:
    
    - Rewrite to new test infra.
    - Rewrite to java.
    - Move to clients-integration-tests.
    - Add `ensureConsistentMetadata` method to `ClusterInstance`,
    similar to `ensureConsistentKRaftMetadata` in the old infra, and
    refactors related code.
    
    Reviewers: TengYao Chi <frankvi...@apache.org>, Ken Huang
     <s7133...@gmail.com>
---
 .../AdminClientWithPoliciesIntegrationTest.java    | 201 +++++++++++++++++
 .../AdminClientWithPoliciesIntegrationTest.scala   | 247 ---------------------
 .../apache/kafka/common/test/ClusterInstance.java  |  17 +-
 3 files changed, 211 insertions(+), 254 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientWithPoliciesIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientWithPoliciesIntegrationTest.java
new file mode 100644
index 00000000000..8bad3b1c900
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientWithPoliciesIntegrationTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.server.policy.AlterConfigPolicy;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests AdminClient calls when the broker is configured with policies - 
AlterConfigPolicy.
+ */
+
+@ClusterTestDefaults(
+    brokers = 3,
+    serverProperties = {
+        @ClusterConfigProperty(key = 
ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, value = 
"org.apache.kafka.clients.admin.AdminClientWithPoliciesIntegrationTest$Policy"),
+    }
+)
+public class AdminClientWithPoliciesIntegrationTest {
+    private final ClusterInstance clusterInstance;
+    private static List<AlterConfigPolicy.RequestMetadata> validations = new 
ArrayList<>();
+
+    AdminClientWithPoliciesIntegrationTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        clusterInstance.waitForReadyBrokers();
+    }
+
+    @ClusterTest
+    public void testInvalidAlterConfigsDueToPolicy() throws Exception {
+        try (final Admin adminClient = clusterInstance.admin()) {
+            // Create topics
+            String topic1 = "invalid-alter-configs-due-to-policy-topic-1";
+            String topic2 = "invalid-alter-configs-due-to-policy-topic-2";
+            String topic3 = "invalid-alter-configs-due-to-policy-topic-3";
+            clusterInstance.createTopic(topic1, 1, (short) 1);
+            clusterInstance.createTopic(topic2, 1, (short) 1);
+            clusterInstance.createTopic(topic3, 1, (short) 1);
+
+            ConfigResource topicResource1 = new 
ConfigResource(ConfigResource.Type.TOPIC, topic1);
+            ConfigResource topicResource2 = new 
ConfigResource(ConfigResource.Type.TOPIC, topic2);
+            ConfigResource topicResource3 = new 
ConfigResource(ConfigResource.Type.TOPIC, topic3);
+
+            // Set a mutable broker config
+            ConfigResource brokerResource = new 
ConfigResource(ConfigResource.Type.BROKER, "0"); // "0" represents the broker ID
+            Map<ConfigResource, Collection<AlterConfigOp>> configOps = Map.of(
+                    brokerResource, List.of(new AlterConfigOp(new 
ConfigEntry(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "50000"), OpType.SET))
+            );
+            adminClient.incrementalAlterConfigs(configOps).all().get();
+            assertEquals(Set.of(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG), 
validationsForResource(brokerResource).get(0).configs().keySet());
+            validations.clear();
+
+            Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = new 
HashMap<>();
+            alterConfigs.put(topicResource1, List.of(
+                    new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"), OpType.SET),
+                    new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"), OpType.SET)
+            ));
+            alterConfigs.put(topicResource2, List.of(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.8"), OpType.SET)));
+            alterConfigs.put(topicResource3, List.of(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "-1"), OpType.SET)));
+            alterConfigs.put(brokerResource, List.of(new AlterConfigOp(new 
ConfigEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "12313"), OpType.SET)));
+
+            // Alter configs: second is valid, the others are invalid
+            AlterConfigsResult alterResult = 
adminClient.incrementalAlterConfigs(alterConfigs);
+            assertEquals(Set.of(topicResource1, topicResource2, 
topicResource3, brokerResource), alterResult.values().keySet());
+            assertFutureThrows(PolicyViolationException.class, 
alterResult.values().get(topicResource1));
+            alterResult.values().get(topicResource2).get();
+            assertFutureThrows(InvalidConfigurationException.class, 
alterResult.values().get(topicResource3));
+            assertFutureThrows(InvalidRequestException.class, 
alterResult.values().get(brokerResource));
+            assertTrue(validationsForResource(brokerResource).isEmpty(),
+                    "Should not see the broker resource in the AlterConfig 
policy when the broker configs are not being updated.");
+            validations.clear();
+
+            // Verify that the second resource was updated and the others were 
not
+            clusterInstance.ensureConsistentMetadata();
+            DescribeConfigsResult describeResult = 
adminClient.describeConfigs(List.of(topicResource1, topicResource2, 
topicResource3, brokerResource));
+            var configs = describeResult.all().get();
+            assertEquals(4, configs.size());
+
+            
assertEquals(String.valueOf(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO), 
configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value());
+            
assertEquals(String.valueOf(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT), 
configs.get(topicResource1).get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value());
+
+            assertEquals("0.8", 
configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value());
+
+            
assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value());
+
+            // Alter configs with validateOnly = true: only second is valid
+            alterConfigs.put(topicResource2, List.of(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.7"), OpType.SET)));
+            alterResult = adminClient.incrementalAlterConfigs(alterConfigs, 
new AlterConfigsOptions().validateOnly(true));
+
+            assertFutureThrows(PolicyViolationException.class, 
alterResult.values().get(topicResource1));
+            alterResult.values().get(topicResource2).get();
+            assertFutureThrows(InvalidConfigurationException.class, 
alterResult.values().get(topicResource3));
+            assertFutureThrows(InvalidRequestException.class, 
alterResult.values().get(brokerResource));
+            assertTrue(validationsForResource(brokerResource).isEmpty(),
+                    "Should not see the broker resource in the AlterConfig 
policy when the broker configs are not being updated.");
+            validations.clear();
+
+            // Verify that no resources are updated since validate_only = true
+            clusterInstance.ensureConsistentMetadata();
+            describeResult = 
adminClient.describeConfigs(List.of(topicResource1, topicResource2, 
topicResource3, brokerResource));
+            configs = describeResult.all().get();
+            assertEquals(4, configs.size());
+
+            
assertEquals(String.valueOf(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO), 
configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value());
+            
assertEquals(String.valueOf(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT), 
configs.get(topicResource1).get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value());
+
+            assertEquals("0.8", 
configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value());
+
+            
assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value());
+
+            // Do an incremental alter config on the broker, ensure we don't 
see the broker config we set earlier in the policy
+            alterResult = adminClient.incrementalAlterConfigs(Map.of(
+                    brokerResource, List.of(new AlterConfigOp(new 
ConfigEntry(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, "9999"), OpType.SET))
+            ));
+            alterResult.all().get();
+            assertEquals(Set.of(SocketServerConfigs.MAX_CONNECTIONS_CONFIG), 
validationsForResource(brokerResource).get(0).configs().keySet());
+        }
+    }
+
+    private static List<AlterConfigPolicy.RequestMetadata> 
validationsForResource(ConfigResource resource) {
+        return validations.stream().filter(req -> 
req.resource().equals(resource)).toList();
+    }
+
+    /**
+     * Used in @ClusterTestDefaults serverProperties, so it may appear unused 
in the IDE.
+     */
+    public static class Policy implements AlterConfigPolicy {
+        private Map<String, ?> configs;
+        private boolean closed = false;
+
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            validations.clear();
+            this.configs = configs;
+        }
+
+        @Override
+        public void validate(AlterConfigPolicy.RequestMetadata 
requestMetadata) {
+            validations.add(requestMetadata);
+            assertFalse(closed, "Policy should not be closed");
+            assertFalse(configs.isEmpty(), "configure should have been called 
with non empty configs");
+            assertFalse(requestMetadata.configs().isEmpty(), "request configs 
should not be empty");
+            assertFalse(requestMetadata.resource().name().isEmpty(), "resource 
name should not be empty");
+            if 
(requestMetadata.configs().containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG))
 {
+                throw new PolicyViolationException("Min in sync replicas 
cannot be updated");
+            }
+        }
+
+        @Override
+        public void close() throws Exception {
+            this.closed = true;
+        }
+    }
+}
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
deleted file mode 100644
index e13e2655e83..00000000000
--- 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ /dev/null
@@ -1,247 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
-  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
-  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
-  * License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
-  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
-  * specific language governing permissions and limitations under the License.
-  */
-
-package kafka.api
-
-import java.util
-import java.util.Properties
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
-import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.admin.AlterConfigOp.OpType
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigOp, AlterConfigsOptions, ConfigEntry}
-import org.apache.kafka.common.config.{ConfigResource, SslConfigs, TopicConfig}
-import org.apache.kafka.common.errors.{InvalidConfigurationException, 
InvalidRequestException, PolicyViolationException}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs}
-import org.apache.kafka.server.policy.AlterConfigPolicy
-import org.apache.kafka.storage.internals.log.LogConfig
-import org.apache.kafka.test.TestUtils.assertFutureThrows
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-/**
-  * Tests AdminClient calls when the broker is configured with policies like 
AlterConfigPolicy, CreateTopicPolicy, etc.
-  */
-@Timeout(120)
-class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness 
with Logging {
-
-  import AdminClientWithPoliciesIntegrationTest._
-
-  var client: Admin = _
-  val brokerCount = 3
-
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    super.setUp(testInfo)
-    TestUtils.waitUntilBrokerMetadataIsPropagated(brokers)
-  }
-
-  @AfterEach
-  override def tearDown(): Unit = {
-    if (client != null)
-      Utils.closeQuietly(client, "AdminClient")
-    super.tearDown()
-  }
-
-  def createConfig: util.Map[String, Object] =
-    util.Map.of[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
-
-  override def generateConfigs: collection.Seq[KafkaConfig] = {
-    val configs = TestUtils.createBrokerConfigs(brokerCount)
-    configs.foreach(overrideNodeConfigs)
-    configs.map(KafkaConfig.fromProps)
-  }
-
-  override def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = {
-    val props = new Properties()
-    overrideNodeConfigs(props)
-    Seq(props)
-  }
-
-  private def overrideNodeConfigs(props: Properties): Unit = {
-    props.put(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, 
classOf[Policy])
-  }
-
-  @Test
-  def testValidAlterConfigs(): Unit = {
-    client = Admin.create(createConfig)
-    // Create topics
-    val topic1 = "describe-alter-configs-topic-1"
-    val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
-    val topicConfig1 = new Properties
-    val maxMessageBytes = "500000"
-    val retentionMs = "60000000"
-    topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 
maxMessageBytes)
-    topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, retentionMs)
-    createTopic(topic1, 1, 1, topicConfig1)
-
-    val topic2 = "describe-alter-configs-topic-2"
-    val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
-    createTopic(topic2)
-
-    PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this, 
topicResource1, topicResource2, maxMessageBytes, retentionMs)
-  }
-
-  @Test
-  def testInvalidAlterConfigs(): Unit = {
-    client = Admin.create(createConfig)
-    PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client)
-  }
-
-  @Test
-  def testInvalidAlterConfigsDueToPolicy(): Unit = {
-    client = Admin.create(createConfig)
-
-    // Create topics
-    val topic1 = "invalid-alter-configs-due-to-policy-topic-1"
-    val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
-    createTopic(topic1)
-
-    val topic2 = "invalid-alter-configs-due-to-policy-topic-2"
-    val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
-    createTopic(topic2)
-
-    val topic3 = "invalid-alter-configs-due-to-policy-topic-3"
-    val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3)
-    createTopic(topic3)
-
-    // Set a mutable broker config
-    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
brokers.head.config.brokerId.toString)
-    var alterResult = 
client.incrementalAlterConfigs(util.Map.of(brokerResource,
-      util.List.of(new AlterConfigOp(new 
ConfigEntry(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "50000"), OpType.SET))))
-    alterResult.all.get
-    assertEquals(Set(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG), 
validationsForResource(brokerResource).head.configs().keySet().asScala)
-    validations.clear()
-
-    val alterConfigs = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]()
-    alterConfigs.put(topicResource1, util.List.of(
-      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"), OpType.SET),
-      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"), OpType.SET)
-    ))
-
-    alterConfigs.put(topicResource2, util.List.of(
-      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.8"), OpType.SET),
-    ))
-
-    alterConfigs.put(topicResource3, util.List.of(
-      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "-1"), OpType.SET),
-    ))
-
-    alterConfigs.put(brokerResource, util.List.of(
-      new AlterConfigOp(new 
ConfigEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "12313"), OpType.SET),
-    ))
-
-    // Alter configs: second is valid, the others are invalid
-    alterResult = client.incrementalAlterConfigs(alterConfigs)
-
-    assertEquals(util.Set.of(topicResource1, topicResource2, topicResource3, 
brokerResource), alterResult.values.keySet)
-    assertFutureThrows(classOf[PolicyViolationException], 
alterResult.values.get(topicResource1))
-    alterResult.values.get(topicResource2).get
-    assertFutureThrows(classOf[InvalidConfigurationException], 
alterResult.values.get(topicResource3))
-    assertFutureThrows(classOf[InvalidRequestException], 
alterResult.values.get(brokerResource))
-    assertTrue(validationsForResource(brokerResource).isEmpty,
-      "Should not see the broker resource in the AlterConfig policy when the 
broker configs are not being updated.")
-    validations.clear()
-
-    // Verify that the second resource was updated and the others were not
-    ensureConsistentKRaftMetadata()
-    var describeResult = client.describeConfigs(util.List.of(topicResource1, 
topicResource2, topicResource3, brokerResource))
-    var configs = describeResult.all.get
-    assertEquals(4, configs.size)
-
-    assertEquals(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO.toString, 
configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
-    assertEquals(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT.toString, 
configs.get(topicResource1).get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value)
-
-    assertEquals("0.8", 
configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
-
-    
assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value)
-
-    // Alter configs with validateOnly = true: only second is valid
-    alterConfigs.put(topicResource2, util.List.of(
-      new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.7"), OpType.SET),
-    ))
-
-    alterResult = client.incrementalAlterConfigs(alterConfigs, new 
AlterConfigsOptions().validateOnly(true))
-
-    assertEquals(util.Set.of(topicResource1, topicResource2, topicResource3, 
brokerResource), alterResult.values.keySet)
-    assertFutureThrows(classOf[PolicyViolationException], 
alterResult.values.get(topicResource1))
-    alterResult.values.get(topicResource2).get
-    assertFutureThrows(classOf[InvalidConfigurationException], 
alterResult.values.get(topicResource3))
-    assertFutureThrows(classOf[InvalidRequestException], 
alterResult.values.get(brokerResource))
-    assertTrue(validationsForResource(brokerResource).isEmpty,
-      "Should not see the broker resource in the AlterConfig policy when the 
broker configs are not being updated.")
-    validations.clear()
-
-    // Verify that no resources are updated since validate_only = true
-    ensureConsistentKRaftMetadata()
-    describeResult = client.describeConfigs(util.List.of(topicResource1, 
topicResource2, topicResource3, brokerResource))
-    configs = describeResult.all.get
-    assertEquals(4, configs.size)
-
-    assertEquals(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO.toString, 
configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
-    assertEquals(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT.toString, 
configs.get(topicResource1).get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value)
-
-    assertEquals("0.8", 
configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
-
-    
assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value)
-
-    // Do an incremental alter config on the broker, ensure we don't see the 
broker config we set earlier in the policy
-    alterResult = client.incrementalAlterConfigs(util.Map.of(
-      brokerResource ,
-        util.List.of(new AlterConfigOp(
-          new ConfigEntry(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, "9999"), 
OpType.SET)
-        )
-    ))
-    alterResult.all.get
-    assertEquals(Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG), 
validationsForResource(brokerResource).head.configs().keySet().asScala)
-  }
-
-}
-
-object AdminClientWithPoliciesIntegrationTest {
-
-  val validations = new mutable.ListBuffer[AlterConfigPolicy.RequestMetadata]()
-
-  def validationsForResource(resource: ConfigResource): 
Seq[AlterConfigPolicy.RequestMetadata] = {
-    validations.filter { req => req.resource().equals(resource) }.toSeq
-  }
-
-  class Policy extends AlterConfigPolicy {
-
-    var configs: Map[String, _] = _
-    var closed = false
-
-    def configure(configs: util.Map[String, _]): Unit = {
-      validations.clear()
-      this.configs = configs.asScala.toMap
-    }
-
-    def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = {
-      validations.append(requestMetadata)
-      require(!closed, "Policy should not be closed")
-      require(configs.nonEmpty, "configure should have been called with non 
empty configs")
-      require(!requestMetadata.configs.isEmpty, "request configs should not be 
empty")
-      require(requestMetadata.resource.name.nonEmpty, "resource name should 
not be empty")
-      if 
(requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG))
-        throw new PolicyViolationException("Min in sync replicas cannot be 
updated")
-    }
-
-    def close(): Unit = closed = true
-
-  }
-}
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index 7662eeda7a3..471692d0016 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -273,12 +273,7 @@ public interface ClusterInstance {
                 broker -> 
broker.metadataCache().numPartitions(topic).isEmpty()),
                 60000L, topic + " metadata not propagated after 60000 ms");
 
-        for (ControllerServer controller : controllers().values()) {
-            long controllerOffset = 
controller.raftManager().replicatedLog().endOffset().offset() - 1;
-            TestUtils.waitForCondition(
-                () -> brokers.stream().allMatch(broker -> ((BrokerServer) 
broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset),
-                60000L, "Timeout waiting for controller metadata propagating 
to brokers");
-        }
+        ensureConsistentMetadata(brokers, controllers().values());
 
         TopicPartition topicPartition = new TopicPartition(topic, 0);
 
@@ -358,7 +353,15 @@ public interface ClusterInstance {
             () -> brokers.stream().allMatch(broker -> 
broker.metadataCache().numPartitions(topic).filter(p -> p == 
partitions).isPresent()),
                 60000L, topic + " metadata not propagated after 60000 ms");
 
-        for (ControllerServer controller : controllers().values()) {
+        ensureConsistentMetadata(brokers, controllers().values());
+    }
+
+    default void ensureConsistentMetadata() throws InterruptedException  {
+        ensureConsistentMetadata(aliveBrokers().values(), 
controllers().values());
+    }
+
+    default void ensureConsistentMetadata(Collection<KafkaBroker> brokers, 
Collection<ControllerServer> controllers) throws InterruptedException  {
+        for (ControllerServer controller : controllers) {
             long controllerOffset = 
controller.raftManager().replicatedLog().endOffset().offset() - 1;
             TestUtils.waitForCondition(
                 () -> brokers.stream().allMatch(broker -> ((BrokerServer) 
broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset),

Reply via email to