This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c1c88968adc KAFKA-19984 Throttle-related dynamic configurations should
not have the `isReadOnly` flag (#21131)
c1c88968adc is described below
commit c1c88968adcee70d308121630450926349ef4276
Author: Ken Huang <[email protected]>
AuthorDate: Tue Feb 10 08:33:49 2026 +0800
KAFKA-19984 Throttle-related dynamic configurations should not have the
`isReadOnly` flag (#21131)
The following configs are weird because those values are definitely not
READONLY
- leader.replication.throttled.rate
- follower.replication.throttled.rate
- replica.alter.log.dirs.io.max.bytes.per.second
Add them into the `DynamicBrokerConfig` to ensure when we query it's not
READONLY
Reviewers: Lan Ding <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../apache/kafka/server/config/QuotaConfig.java | 2 ++
.../kafka/server/config/DynamicBrokerConfig.java | 3 ++-
.../BootstrapControllersIntegrationTest.java | 27 ++++++++++++++++++++++
3 files changed, 31 insertions(+), 1 deletion(-)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java
b/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java
index 8e89d1a28c0..27a1c46bdad 100644
---
a/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java
+++
b/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java
@@ -225,6 +225,8 @@ public class QuotaConfig {
ConfigDef.Importance.MEDIUM,
QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_DOC);
}
+ public static final Set<String> BROKER_QUOTA_CONFIGS =
Set.copyOf(brokerQuotaConfigs().names());
+
public static ConfigDef userAndClientQuotaConfigs() {
ConfigDef configDef = new ConfigDef();
buildUserClientQuotaConfigDef(configDef);
diff --git
a/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
index a45c65a9b9b..7d26793b9c2 100644
---
a/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
@@ -79,7 +79,8 @@ public class DynamicBrokerConfig {
DynamicReplicationConfig.RECONFIGURABLE_CONFIGS,
List.of(AbstractConfig.CONFIG_PROVIDERS_CONFIG),
GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS,
- ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS)
+ ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS,
+ QuotaConfig.BROKER_QUOTA_CONFIGS)
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableSet());
diff --git
a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
index c889ba74d9f..1ca1f6924b5 100644
---
a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
+++
b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java
@@ -61,6 +61,7 @@ import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Timeout;
@@ -72,6 +73,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static
org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG;
import static
org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
@@ -81,6 +83,7 @@ import static
org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static
org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -429,4 +432,28 @@ public class BootstrapControllersIntegrationTest {
public void
testIncrementalAlterConfigsByAllControllersWithDynamicQuorum(ClusterInstance
clusterInstance) throws Exception {
testIncrementalAlterConfigs(clusterInstance, true);
}
+
+ @ClusterTest
+ public void testQuotaConfigsIsReadOnlyShouldBeFalse(ClusterInstance
clusterInstance) throws Exception {
+ try (Admin admin = Admin.create(adminConfig(clusterInstance, true))) {
+ int nodeId =
clusterInstance.controllers().values().iterator().next().config().nodeId();
+ ConfigResource nodeResource = new ConfigResource(BROKER, "" +
nodeId);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterations =
Map.of(
+ nodeResource, List.of(
+ new AlterConfigOp(new
ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, "16800"),
AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, "16800"),
AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG,
"16800"), AlterConfigOp.OpType.SET)
+ ));
+ admin.incrementalAlterConfigs(alterations).all().get(1,
TimeUnit.MINUTES);
+ TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+ Config config = admin.describeConfigs(List.of(nodeResource)).
+ all().get(1, TimeUnit.MINUTES).get(nodeResource);
+ Map<String, ConfigEntry> configEntries =
config.entries().stream()
+ .collect(Collectors.toMap(ConfigEntry::name, e -> e));
+
assertFalse(configEntries.get(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG).isReadOnly());
+
assertFalse(configEntries.get(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG).isReadOnly());
+
assertFalse(configEntries.get(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG).isReadOnly());
+ });
+ }
+ }
}