This is an automated email from the ASF dual-hosted git repository. frankvicky pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 55260e9835a KAFKA-19042: Move AdminClientWithPoliciesIntegrationTest to clients-integration-tests module (#20339) 55260e9835a is described below commit 55260e9835ae74c7eb1e1c54e90b7f00f6e278d2 Author: Jhen-Yung Hsu <jhenyung...@gmail.com> AuthorDate: Fri Aug 15 17:44:47 2025 +0800 KAFKA-19042: Move AdminClientWithPoliciesIntegrationTest to clients-integration-tests module (#20339) This PR does the following: - Rewrite to new test infra. - Rewrite to java. - Move to clients-integration-tests. - Add `ensureConsistentMetadata` method to `ClusterInstance`, similar to `ensureConsistentKRaftMetadata` in the old infra, and refactors related code. Reviewers: TengYao Chi <frankvi...@apache.org>, Ken Huang <s7133...@gmail.com> --- .../AdminClientWithPoliciesIntegrationTest.java | 201 +++++++++++++++++ .../AdminClientWithPoliciesIntegrationTest.scala | 247 --------------------- .../apache/kafka/common/test/ClusterInstance.java | 17 +- 3 files changed, 211 insertions(+), 254 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientWithPoliciesIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientWithPoliciesIntegrationTest.java new file mode 100644 index 00000000000..8bad3b1c900 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientWithPoliciesIntegrationTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.PolicyViolationException; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.policy.AlterConfigPolicy; +import org.apache.kafka.storage.internals.log.LogConfig; + +import org.junit.jupiter.api.BeforeEach; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests AdminClient calls when the broker is configured with policies - AlterConfigPolicy. + */ + +@ClusterTestDefaults( + brokers = 3, + serverProperties = { + @ClusterConfigProperty(key = ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, value = "org.apache.kafka.clients.admin.AdminClientWithPoliciesIntegrationTest$Policy"), + } +) +public class AdminClientWithPoliciesIntegrationTest { + private final ClusterInstance clusterInstance; + private static List<AlterConfigPolicy.RequestMetadata> validations = new ArrayList<>(); + + AdminClientWithPoliciesIntegrationTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; + } + + @BeforeEach + public void setup() throws InterruptedException { + clusterInstance.waitForReadyBrokers(); + } + + @ClusterTest + public void testInvalidAlterConfigsDueToPolicy() throws Exception { + try (final Admin adminClient = clusterInstance.admin()) { + // Create topics + String topic1 = "invalid-alter-configs-due-to-policy-topic-1"; + String topic2 = "invalid-alter-configs-due-to-policy-topic-2"; + String topic3 = "invalid-alter-configs-due-to-policy-topic-3"; + clusterInstance.createTopic(topic1, 1, (short) 1); + clusterInstance.createTopic(topic2, 1, (short) 1); + clusterInstance.createTopic(topic3, 1, (short) 1); + + ConfigResource topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1); + ConfigResource topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2); + ConfigResource topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3); + + // Set a mutable broker config + ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); // "0" represents the broker ID + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Map.of( + brokerResource, List.of(new AlterConfigOp(new ConfigEntry(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "50000"), OpType.SET)) + ); + adminClient.incrementalAlterConfigs(configOps).all().get(); + assertEquals(Set.of(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG), validationsForResource(brokerResource).get(0).configs().keySet()); + validations.clear(); + + Map<ConfigResource, Collection<AlterConfigOp>> alterConfigs = new HashMap<>(); + alterConfigs.put(topicResource1, List.of( + new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"), OpType.SET), + new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"), OpType.SET) + )); + alterConfigs.put(topicResource2, List.of(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.8"), OpType.SET))); + alterConfigs.put(topicResource3, List.of(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "-1"), OpType.SET))); + alterConfigs.put(brokerResource, List.of(new AlterConfigOp(new ConfigEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "12313"), OpType.SET))); + + // Alter configs: second is valid, the others are invalid + AlterConfigsResult alterResult = adminClient.incrementalAlterConfigs(alterConfigs); + assertEquals(Set.of(topicResource1, topicResource2, topicResource3, brokerResource), alterResult.values().keySet()); + assertFutureThrows(PolicyViolationException.class, alterResult.values().get(topicResource1)); + alterResult.values().get(topicResource2).get(); + assertFutureThrows(InvalidConfigurationException.class, alterResult.values().get(topicResource3)); + assertFutureThrows(InvalidRequestException.class, alterResult.values().get(brokerResource)); + assertTrue(validationsForResource(brokerResource).isEmpty(), + "Should not see the broker resource in the AlterConfig policy when the broker configs are not being updated."); + validations.clear(); + + // Verify that the second resource was updated and the others were not + clusterInstance.ensureConsistentMetadata(); + DescribeConfigsResult describeResult = adminClient.describeConfigs(List.of(topicResource1, topicResource2, topicResource3, brokerResource)); + var configs = describeResult.all().get(); + assertEquals(4, configs.size()); + + assertEquals(String.valueOf(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO), configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value()); + assertEquals(String.valueOf(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT), configs.get(topicResource1).get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value()); + + assertEquals("0.8", configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value()); + + assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value()); + + // Alter configs with validateOnly = true: only second is valid + alterConfigs.put(topicResource2, List.of(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.7"), OpType.SET))); + alterResult = adminClient.incrementalAlterConfigs(alterConfigs, new AlterConfigsOptions().validateOnly(true)); + + assertFutureThrows(PolicyViolationException.class, alterResult.values().get(topicResource1)); + alterResult.values().get(topicResource2).get(); + assertFutureThrows(InvalidConfigurationException.class, alterResult.values().get(topicResource3)); + assertFutureThrows(InvalidRequestException.class, alterResult.values().get(brokerResource)); + assertTrue(validationsForResource(brokerResource).isEmpty(), + "Should not see the broker resource in the AlterConfig policy when the broker configs are not being updated."); + validations.clear(); + + // Verify that no resources are updated since validate_only = true + clusterInstance.ensureConsistentMetadata(); + describeResult = adminClient.describeConfigs(List.of(topicResource1, topicResource2, topicResource3, brokerResource)); + configs = describeResult.all().get(); + assertEquals(4, configs.size()); + + assertEquals(String.valueOf(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO), configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value()); + assertEquals(String.valueOf(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT), configs.get(topicResource1).get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value()); + + assertEquals("0.8", configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value()); + + assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value()); + + // Do an incremental alter config on the broker, ensure we don't see the broker config we set earlier in the policy + alterResult = adminClient.incrementalAlterConfigs(Map.of( + brokerResource, List.of(new AlterConfigOp(new ConfigEntry(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, "9999"), OpType.SET)) + )); + alterResult.all().get(); + assertEquals(Set.of(SocketServerConfigs.MAX_CONNECTIONS_CONFIG), validationsForResource(brokerResource).get(0).configs().keySet()); + } + } + + private static List<AlterConfigPolicy.RequestMetadata> validationsForResource(ConfigResource resource) { + return validations.stream().filter(req -> req.resource().equals(resource)).toList(); + } + + /** + * Used in @ClusterTestDefaults serverProperties, so it may appear unused in the IDE. + */ + public static class Policy implements AlterConfigPolicy { + private Map<String, ?> configs; + private boolean closed = false; + + + @Override + public void configure(Map<String, ?> configs) { + validations.clear(); + this.configs = configs; + } + + @Override + public void validate(AlterConfigPolicy.RequestMetadata requestMetadata) { + validations.add(requestMetadata); + assertFalse(closed, "Policy should not be closed"); + assertFalse(configs.isEmpty(), "configure should have been called with non empty configs"); + assertFalse(requestMetadata.configs().isEmpty(), "request configs should not be empty"); + assertFalse(requestMetadata.resource().name().isEmpty(), "resource name should not be empty"); + if (requestMetadata.configs().containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { + throw new PolicyViolationException("Min in sync replicas cannot be updated"); + } + } + + @Override + public void close() throws Exception { + this.closed = true; + } + } +} diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala deleted file mode 100644 index e13e2655e83..00000000000 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ /dev/null @@ -1,247 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package kafka.api - -import java.util -import java.util.Properties -import kafka.integration.KafkaServerTestHarness -import kafka.server.KafkaConfig -import kafka.utils.{Logging, TestUtils} -import org.apache.kafka.clients.admin.AlterConfigOp.OpType -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsOptions, ConfigEntry} -import org.apache.kafka.common.config.{ConfigResource, SslConfigs, TopicConfig} -import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, PolicyViolationException} -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.network.SocketServerConfigs -import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs} -import org.apache.kafka.server.policy.AlterConfigPolicy -import org.apache.kafka.storage.internals.log.LogConfig -import org.apache.kafka.test.TestUtils.assertFutureThrows -import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout} - -import scala.collection.mutable -import scala.jdk.CollectionConverters._ - -/** - * Tests AdminClient calls when the broker is configured with policies like AlterConfigPolicy, CreateTopicPolicy, etc. - */ -@Timeout(120) -class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with Logging { - - import AdminClientWithPoliciesIntegrationTest._ - - var client: Admin = _ - val brokerCount = 3 - - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { - super.setUp(testInfo) - TestUtils.waitUntilBrokerMetadataIsPropagated(brokers) - } - - @AfterEach - override def tearDown(): Unit = { - if (client != null) - Utils.closeQuietly(client, "AdminClient") - super.tearDown() - } - - def createConfig: util.Map[String, Object] = - util.Map.of[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) - - override def generateConfigs: collection.Seq[KafkaConfig] = { - val configs = TestUtils.createBrokerConfigs(brokerCount) - configs.foreach(overrideNodeConfigs) - configs.map(KafkaConfig.fromProps) - } - - override def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = { - val props = new Properties() - overrideNodeConfigs(props) - Seq(props) - } - - private def overrideNodeConfigs(props: Properties): Unit = { - props.put(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[Policy]) - } - - @Test - def testValidAlterConfigs(): Unit = { - client = Admin.create(createConfig) - // Create topics - val topic1 = "describe-alter-configs-topic-1" - val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) - val topicConfig1 = new Properties - val maxMessageBytes = "500000" - val retentionMs = "60000000" - topicConfig1.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageBytes) - topicConfig1.setProperty(TopicConfig.RETENTION_MS_CONFIG, retentionMs) - createTopic(topic1, 1, 1, topicConfig1) - - val topic2 = "describe-alter-configs-topic-2" - val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) - createTopic(topic2) - - PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this, topicResource1, topicResource2, maxMessageBytes, retentionMs) - } - - @Test - def testInvalidAlterConfigs(): Unit = { - client = Admin.create(createConfig) - PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client) - } - - @Test - def testInvalidAlterConfigsDueToPolicy(): Unit = { - client = Admin.create(createConfig) - - // Create topics - val topic1 = "invalid-alter-configs-due-to-policy-topic-1" - val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) - createTopic(topic1) - - val topic2 = "invalid-alter-configs-due-to-policy-topic-2" - val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) - createTopic(topic2) - - val topic3 = "invalid-alter-configs-due-to-policy-topic-3" - val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3) - createTopic(topic3) - - // Set a mutable broker config - val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokers.head.config.brokerId.toString) - var alterResult = client.incrementalAlterConfigs(util.Map.of(brokerResource, - util.List.of(new AlterConfigOp(new ConfigEntry(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "50000"), OpType.SET)))) - alterResult.all.get - assertEquals(Set(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG), validationsForResource(brokerResource).head.configs().keySet().asScala) - validations.clear() - - val alterConfigs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() - alterConfigs.put(topicResource1, util.List.of( - new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.9"), OpType.SET), - new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"), OpType.SET) - )) - - alterConfigs.put(topicResource2, util.List.of( - new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.8"), OpType.SET), - )) - - alterConfigs.put(topicResource3, util.List.of( - new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "-1"), OpType.SET), - )) - - alterConfigs.put(brokerResource, util.List.of( - new AlterConfigOp(new ConfigEntry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "12313"), OpType.SET), - )) - - // Alter configs: second is valid, the others are invalid - alterResult = client.incrementalAlterConfigs(alterConfigs) - - assertEquals(util.Set.of(topicResource1, topicResource2, topicResource3, brokerResource), alterResult.values.keySet) - assertFutureThrows(classOf[PolicyViolationException], alterResult.values.get(topicResource1)) - alterResult.values.get(topicResource2).get - assertFutureThrows(classOf[InvalidConfigurationException], alterResult.values.get(topicResource3)) - assertFutureThrows(classOf[InvalidRequestException], alterResult.values.get(brokerResource)) - assertTrue(validationsForResource(brokerResource).isEmpty, - "Should not see the broker resource in the AlterConfig policy when the broker configs are not being updated.") - validations.clear() - - // Verify that the second resource was updated and the others were not - ensureConsistentKRaftMetadata() - var describeResult = client.describeConfigs(util.List.of(topicResource1, topicResource2, topicResource3, brokerResource)) - var configs = describeResult.all.get - assertEquals(4, configs.size) - - assertEquals(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO.toString, configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value) - assertEquals(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT.toString, configs.get(topicResource1).get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value) - - assertEquals("0.8", configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value) - - assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value) - - // Alter configs with validateOnly = true: only second is valid - alterConfigs.put(topicResource2, util.List.of( - new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.7"), OpType.SET), - )) - - alterResult = client.incrementalAlterConfigs(alterConfigs, new AlterConfigsOptions().validateOnly(true)) - - assertEquals(util.Set.of(topicResource1, topicResource2, topicResource3, brokerResource), alterResult.values.keySet) - assertFutureThrows(classOf[PolicyViolationException], alterResult.values.get(topicResource1)) - alterResult.values.get(topicResource2).get - assertFutureThrows(classOf[InvalidConfigurationException], alterResult.values.get(topicResource3)) - assertFutureThrows(classOf[InvalidRequestException], alterResult.values.get(brokerResource)) - assertTrue(validationsForResource(brokerResource).isEmpty, - "Should not see the broker resource in the AlterConfig policy when the broker configs are not being updated.") - validations.clear() - - // Verify that no resources are updated since validate_only = true - ensureConsistentKRaftMetadata() - describeResult = client.describeConfigs(util.List.of(topicResource1, topicResource2, topicResource3, brokerResource)) - configs = describeResult.all.get - assertEquals(4, configs.size) - - assertEquals(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO.toString, configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value) - assertEquals(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT.toString, configs.get(topicResource1).get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value) - - assertEquals("0.8", configs.get(topicResource2).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value) - - assertNull(configs.get(brokerResource).get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).value) - - // Do an incremental alter config on the broker, ensure we don't see the broker config we set earlier in the policy - alterResult = client.incrementalAlterConfigs(util.Map.of( - brokerResource , - util.List.of(new AlterConfigOp( - new ConfigEntry(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, "9999"), OpType.SET) - ) - )) - alterResult.all.get - assertEquals(Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG), validationsForResource(brokerResource).head.configs().keySet().asScala) - } - -} - -object AdminClientWithPoliciesIntegrationTest { - - val validations = new mutable.ListBuffer[AlterConfigPolicy.RequestMetadata]() - - def validationsForResource(resource: ConfigResource): Seq[AlterConfigPolicy.RequestMetadata] = { - validations.filter { req => req.resource().equals(resource) }.toSeq - } - - class Policy extends AlterConfigPolicy { - - var configs: Map[String, _] = _ - var closed = false - - def configure(configs: util.Map[String, _]): Unit = { - validations.clear() - this.configs = configs.asScala.toMap - } - - def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = { - validations.append(requestMetadata) - require(!closed, "Policy should not be closed") - require(configs.nonEmpty, "configure should have been called with non empty configs") - require(!requestMetadata.configs.isEmpty, "request configs should not be empty") - require(requestMetadata.resource.name.nonEmpty, "resource name should not be empty") - if (requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) - throw new PolicyViolationException("Min in sync replicas cannot be updated") - } - - def close(): Unit = closed = true - - } -} diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 7662eeda7a3..471692d0016 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -273,12 +273,7 @@ public interface ClusterInstance { broker -> broker.metadataCache().numPartitions(topic).isEmpty()), 60000L, topic + " metadata not propagated after 60000 ms"); - for (ControllerServer controller : controllers().values()) { - long controllerOffset = controller.raftManager().replicatedLog().endOffset().offset() - 1; - TestUtils.waitForCondition( - () -> brokers.stream().allMatch(broker -> ((BrokerServer) broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset), - 60000L, "Timeout waiting for controller metadata propagating to brokers"); - } + ensureConsistentMetadata(brokers, controllers().values()); TopicPartition topicPartition = new TopicPartition(topic, 0); @@ -358,7 +353,15 @@ public interface ClusterInstance { () -> brokers.stream().allMatch(broker -> broker.metadataCache().numPartitions(topic).filter(p -> p == partitions).isPresent()), 60000L, topic + " metadata not propagated after 60000 ms"); - for (ControllerServer controller : controllers().values()) { + ensureConsistentMetadata(brokers, controllers().values()); + } + + default void ensureConsistentMetadata() throws InterruptedException { + ensureConsistentMetadata(aliveBrokers().values(), controllers().values()); + } + + default void ensureConsistentMetadata(Collection<KafkaBroker> brokers, Collection<ControllerServer> controllers) throws InterruptedException { + for (ControllerServer controller : controllers) { long controllerOffset = controller.raftManager().replicatedLog().endOffset().offset() - 1; TestUtils.waitForCondition( () -> brokers.stream().allMatch(broker -> ((BrokerServer) broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset),