This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a6a0ea56d89 KAFKA-17171 Add test cases for `STATIC_BROKER_CONFIG`in kraft mode (#18463) a6a0ea56d89 is described below commit a6a0ea56d89c811478fb18cdf9f9a4d19ea6fb88 Author: TengYao Chi <kiting...@gmail.com> AuthorDate: Tue Mar 18 00:30:53 2025 +0800 KAFKA-17171 Add test cases for `STATIC_BROKER_CONFIG`in kraft mode (#18463) Given that the `core` module will be separated into other small modules, this test will not be added to the core module. Instead, I added it to the `clients-integration-tests` module since it focuses on the admin client test. The patch should include following test cases. 1. a topic-related static config is added to quorum controller. The configs from topic creation should include it, but `describeConfigs` does not. 2. a topic-related static config is added to quorum controller. The configs from topic creation should include it, and `describeConfigs` does if admin is using controller.bootstrap 3. a topic-related static config is added to broker. The configs from topic creation should NOT include it, but `describeConfigs` does. 4. a topic-related static config is added to broker. The configs from topic creation should NOT include it, and `describeConfigs` does not also if admin is using controller.bootstrap for another, the docs of `STATIC_BROKER_CONFIG` should remind the impact of "controller.properties" BTW, those test cases should leverage new test infra, since new test infra allow us to define configs to broker/controller individually. Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../clients/admin/StaticBrokerConfigTest.java | 115 +++++++++++++++++++++ .../BootstrapControllersIntegrationTest.java | 21 ++++ 2 files changed, 136 insertions(+) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java new file mode 100644 index 00000000000..9e56e6bd3e6 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.common.test.junit.ClusterTestExtensions; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +@ExtendWith(value = ClusterTestExtensions.class) +public class StaticBrokerConfigTest { + private static final String TOPIC = "topic"; + private static final String CUSTOM_VALUE = "1048576"; + + /** + * synonyms of `segment.bytes` + */ + private static final String LOG_SEGMENT_BYTES = "log.segment.bytes"; + + @ClusterTest(types = {Type.KRAFT}, + serverProperties = { + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(id = 3000, key = LOG_SEGMENT_BYTES, value = CUSTOM_VALUE) + }) + public void testTopicConfigsGetImpactedIfStaticConfigsAddToController(ClusterInstance cluster) + throws ExecutionException, InterruptedException { + try ( + Admin admin = cluster.admin(); + Admin adminUsingBootstrapController = cluster.admin(Map.of(), true) + ) { + ConfigEntry configEntry = admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) 1))) + .config(TOPIC).get().get(TopicConfig.SEGMENT_BYTES_CONFIG); + assertEquals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG, configEntry.source()); + assertEquals(CUSTOM_VALUE, configEntry.value(), "Config value should be custom value since controller has related static config"); + + ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); + configEntry = admin.describeConfigs(List.of(brokerResource)).all().get().get(brokerResource).get(LOG_SEGMENT_BYTES); + assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG, configEntry.source()); + assertNotEquals(CUSTOM_VALUE, + configEntry.value(), + "Config value should not be custom value since broker doesn't have related static config"); + + ConfigResource controllerResource = new ConfigResource(ConfigResource.Type.BROKER, "3000"); + configEntry = adminUsingBootstrapController.describeConfigs(List.of(controllerResource)) + .all().get().get(controllerResource).get(LOG_SEGMENT_BYTES); + assertEquals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG, configEntry.source()); + assertEquals(CUSTOM_VALUE, + configEntry.value(), + "Config value should be custom value since controller has related static config"); + } + } + + @ClusterTest(types = {Type.KRAFT}, + serverProperties = { + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(id = 0, key = LOG_SEGMENT_BYTES, value = CUSTOM_VALUE) + }) + public void testTopicConfigsGetImpactedIfStaticConfigsAddToBroker(ClusterInstance cluster) + throws ExecutionException, InterruptedException { + try ( + Admin admin = cluster.admin(); + Admin adminUsingBootstrapController = cluster.admin(Map.of(), true) + ) { + ConfigEntry configEntry = admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) 1))) + .config(TOPIC).get().get(TopicConfig.SEGMENT_BYTES_CONFIG); + assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG, configEntry.source()); + assertNotEquals(CUSTOM_VALUE, + configEntry.value(), + "Config value should not be custom value since controller doesn't have static config"); + + ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); + configEntry = admin.describeConfigs(List.of(brokerResource)).all().get().get(brokerResource).get(LOG_SEGMENT_BYTES); + assertEquals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG, configEntry.source()); + assertEquals(CUSTOM_VALUE, + configEntry.value(), + "Config value should be custom value since broker has related static config"); + + ConfigResource controllerResource = new ConfigResource(ConfigResource.Type.BROKER, "3000"); + configEntry = adminUsingBootstrapController.describeConfigs(List.of(controllerResource)) + .all().get().get(controllerResource).get(LOG_SEGMENT_BYTES); + assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG, configEntry.source()); + assertNotEquals(CUSTOM_VALUE, + configEntry.value(), + "Config value should not be custom value since controller doesn't have related static config"); + } + } +} diff --git a/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java b/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java index dcee16b8ec6..17ba071beb0 100644 --- a/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java +++ b/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.InvalidUpdateVersionException; import org.apache.kafka.common.errors.MismatchedEndpointTypeException; import org.apache.kafka.common.errors.UnsupportedEndpointTypeException; @@ -77,6 +78,7 @@ import java.util.stream.Collectors; import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG; import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG; +import static org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG; import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; import static org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -342,4 +344,23 @@ public class BootstrapControllersIntegrationTest { assertEquals(aclBinding, deletedAclBindings.iterator().next()); } } + + @ClusterTest( + brokers = 2, + serverProperties = { + @ClusterConfigProperty(key = TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, value = "2") + } + ) + public void testDescribeConfigs(ClusterInstance clusterInstance) throws Exception { + try (Admin admin = Admin.create(adminConfig(clusterInstance, true))) { + ConfigResource resource = new ConfigResource(BROKER, ""); + Map<ConfigResource, Config> resourceToConfig = admin.describeConfigs(List.of(resource)).all().get(); + Config config = resourceToConfig.get(resource); + assertNotNull(config); + ConfigEntry configEntry = config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG); + assertEquals(DYNAMIC_DEFAULT_BROKER_CONFIG, configEntry.source()); + assertNotNull(configEntry); + assertEquals("2", configEntry.value()); + } + } }