This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dbc7787fcd7 KAFKA-19590 Add prefix to 
TopicBasedRemoteLogMetadataManagerConfig to enable setting admin configs 
(#21277)
dbc7787fcd7 is described below

commit dbc7787fcd760373f370d84b156383bc0a117ebe
Author: Lan Ding <[email protected]>
AuthorDate: Tue Jan 13 22:49:28 2026 +0800

    KAFKA-19590 Add prefix to TopicBasedRemoteLogMetadataManagerConfig to 
enable setting admin configs (#21277)
    
    Currently, `TopicBasedRemoteLogMetadataManager` creates the Admin client
    using `commonProperties()`, which prevents users from setting
    admin-specific   configurations (e.g., custom timeouts, retry policies).
    
    This commit adds a new configuration prefix
    `remote.log.metadata.admin.*` to   allow separate configuration for the
    Admin client, similar to the existing producer   and consumer prefixes.
    
    Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 docs/configuration/tiered-storage-configs.md       |  3 ++
 docs/getting-started/upgrade.md                    |  2 +-
 .../TopicBasedRemoteLogMetadataManager.java        |  2 +-
 .../TopicBasedRemoteLogMetadataManagerConfig.java  | 25 ++++++++++++--
 ...picBasedRemoteLogMetadataManagerConfigTest.java | 38 ++++++++++++++++++----
 5 files changed, 59 insertions(+), 11 deletions(-)

diff --git a/docs/configuration/tiered-storage-configs.md 
b/docs/configuration/tiered-storage-configs.md
index ad1e6f43d06..ac6041b660b 100644
--- a/docs/configuration/tiered-storage-configs.md
+++ b/docs/configuration/tiered-storage-configs.md
@@ -43,6 +43,9 @@ Additional configurations can be provided for different types 
of clients using t
     
     # Configs for admin, producer, and consumer clients
     <rlmm.prefix>.remote.log.metadata.common.client.<kafka.property> = <value>
+
+    # Configs only for admin client
+    <rlmm.prefix>.remote.log.metadata.admin.<kafka.property> = <value>
     
     # Configs only for producer client
     <rlmm.prefix>.remote.log.metadata.producer.<kafka.property> = <value>
diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md
index ee9a890a05c..24ced55efcb 100644
--- a/docs/getting-started/upgrade.md
+++ b/docs/getting-started/upgrade.md
@@ -35,7 +35,7 @@ type: docs
   * Support dynamically changing configs for dynamic quorum controllers. 
Previously only brokers and static quorum controllers were supported. For 
further details, please refer to 
[KAFKA-18928](https://issues.apache.org/jira/browse/KAFKA-18928). 
   * Two new configs have been introduced: 
`group.coordinator.cached.buffer.max.bytes` and 
`share.coordinator.cached.buffer.max.bytes`. They allow the respective 
coordinators to set the maximum buffer size retained for reuse. For further 
details, please refer to 
[KIP-1196](https://cwiki.apache.org/confluence/x/hA5JFg). 
   * The new config have been introduced: `remote.log.metadata.topic.min.isr` 
with 2 as default value. You can correct the min.insync.replicas for the 
existed __remote_log_metadata topic via kafka-configs.sh if needed. For further 
details, please refer to 
[KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).
-
+  * The new config prefix `remote.log.metadata.admin.` has been introduced. It 
allows independent configuration of the admin client used by 
`TopicBasedRemoteLogMetadataManager`. For further details, please refer to 
[KIP-1208](https://cwiki.apache.org/confluence/x/vYqhFg).
 
 ## Upgrading to 4.2.0
 
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index cd2ed674216..00f20fb4f14 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -323,7 +323,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
BrokerReadyCallback,
         boolean isTopicCreated = false;
         long startTimeMs = time.milliseconds();
         boolean initializationFailed = false;
-        try (Admin admin = Admin.create(rlmmConfig.commonProperties())) {
+        try (Admin admin = Admin.create(rlmmConfig.adminProperties())) {
             while (!(initialized.get() || closing.get() || 
initializationFailed)) {
                 if (time.milliseconds() - startTimeMs > retryMaxTimeoutMs) {
                     log.error("Timed out to initialize the resources within {} 
ms.", retryMaxTimeoutMs);
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
index 52f934ce299..29f117cd54e 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
@@ -80,6 +80,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
     public static final String REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX = 
"remote.log.metadata.common.client.";
     public static final String REMOTE_LOG_METADATA_PRODUCER_PREFIX = 
"remote.log.metadata.producer.";
     public static final String REMOTE_LOG_METADATA_CONSUMER_PREFIX = 
"remote.log.metadata.consumer.";
+    public static final String REMOTE_LOG_METADATA_ADMIN_PREFIX = 
"remote.log.metadata.admin.";
     public static final String BROKER_ID = "broker.id";
     public static final String LOG_DIR = "log.dir";
 
@@ -118,6 +119,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig 
{
     private Map<String, Object> commonProps;
     private Map<String, Object> consumerProps;
     private Map<String, Object> producerProps;
+    private Map<String, Object> adminProps;
 
     public TopicBasedRemoteLogMetadataManagerConfig(Map<String, ?> props) {
         Objects.requireNonNull(props, "props can not be null");
@@ -137,13 +139,14 @@ public final class 
TopicBasedRemoteLogMetadataManagerConfig {
         initializationRetryIntervalMs = (long) 
parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP);
         initializationRetryMaxTimeoutMs = (long) 
parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP);
         clientIdPrefix = REMOTE_LOG_METADATA_CLIENT_PREFIX + "_" + 
props.get(BROKER_ID);
-        initializeProducerConsumerProperties(props);
+        initializeClientProperties(props);
     }
 
-    private void initializeProducerConsumerProperties(Map<String, ?> configs) {
+    private void initializeClientProperties(Map<String, ?> configs) {
         Map<String, Object> commonClientConfigs = new HashMap<>();
         Map<String, Object> producerOnlyConfigs = new HashMap<>();
         Map<String, Object> consumerOnlyConfigs = new HashMap<>();
+        Map<String, Object> adminOnlyConfigs = new HashMap<>();
         for (Map.Entry<String, ?> entry : configs.entrySet()) {
             String key = entry.getKey();
             if (key.startsWith(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX)) {
@@ -152,6 +155,8 @@ public final class TopicBasedRemoteLogMetadataManagerConfig 
{
                 
producerOnlyConfigs.put(key.substring(REMOTE_LOG_METADATA_PRODUCER_PREFIX.length()),
 entry.getValue());
             } else if (key.startsWith(REMOTE_LOG_METADATA_CONSUMER_PREFIX)) {
                 
consumerOnlyConfigs.put(key.substring(REMOTE_LOG_METADATA_CONSUMER_PREFIX.length()),
 entry.getValue());
+            } else if (key.startsWith(REMOTE_LOG_METADATA_ADMIN_PREFIX)) {
+                
adminOnlyConfigs.put(key.substring(REMOTE_LOG_METADATA_ADMIN_PREFIX.length()), 
entry.getValue());
             }
         }
         commonProps = new HashMap<>(commonClientConfigs);
@@ -161,6 +166,9 @@ public final class TopicBasedRemoteLogMetadataManagerConfig 
{
         Map<String, Object> allConsumerConfigs = new 
HashMap<>(commonClientConfigs);
         allConsumerConfigs.putAll(consumerOnlyConfigs);
         consumerProps = createConsumerProps(allConsumerConfigs);
+        Map<String, Object> allAdminConfigs = new 
HashMap<>(commonClientConfigs);
+        allAdminConfigs.putAll(adminOnlyConfigs);
+        adminProps = createAdminProps(allAdminConfigs);
     }
 
     public String remoteLogMetadataTopicName() {
@@ -199,6 +207,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig 
{
         return logDir;
     }
 
+    // Used for testing
     public Map<String, Object> commonProperties() {
         return commonProps;
     }
@@ -211,6 +220,10 @@ public final class 
TopicBasedRemoteLogMetadataManagerConfig {
         return producerProps;
     }
 
+    public Map<String, Object> adminProperties() {
+        return adminProps;
+    }
+
     private Map<String, Object> createConsumerProps(Map<String, Object> 
allConsumerConfigs) {
         Map<String, Object> props = new HashMap<>(allConsumerConfigs);
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientIdPrefix + 
"_consumer");
@@ -232,6 +245,12 @@ public final class 
TopicBasedRemoteLogMetadataManagerConfig {
         return Collections.unmodifiableMap(props);
     }
 
+    private Map<String, Object> createAdminProps(Map<String, Object> 
allAdminConfigs) {
+        Map<String, Object> props = new HashMap<>(allAdminConfigs);
+        props.put(AdminClientConfig.CLIENT_ID_CONFIG, clientIdPrefix + 
"_admin");
+        return Collections.unmodifiableMap(props);
+    }
+
     @Override
     public String toString() {
         return "TopicBasedRemoteLogMetadataManagerConfig{" +
@@ -243,9 +262,9 @@ public final class TopicBasedRemoteLogMetadataManagerConfig 
{
                 ", metadataTopicMinIsr=" + metadataTopicMinIsr +
                 ", initializationRetryMaxTimeoutMs=" + 
initializationRetryMaxTimeoutMs +
                 ", initializationRetryIntervalMs=" + 
initializationRetryIntervalMs +
-                ", commonProps=" + configMapToRedactedString(commonProps, 
AdminClientConfig.configDef()) +
                 ", consumerProps=" + configMapToRedactedString(consumerProps, 
ConsumerConfig.configDef()) +
                 ", producerProps=" + configMapToRedactedString(producerProps, 
ProducerConfig.configDef()) +
+                ", adminProps=" + configMapToRedactedString(adminProps, 
AdminClientConfig.configDef()) +
                 '}';
     }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
index e3eb05b72b9..bad522ff51e 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.server.log.remote.metadata.storage;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.SaslConfigs;
@@ -34,6 +35,7 @@ import java.util.Map;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_ADMIN_PREFIX;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
@@ -60,7 +62,10 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
         Map<String, Object> consumerConfig = new HashMap<>();
         consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
-        Map<String, Object> props = createValidConfigProps(commonClientConfig, 
producerConfig, consumerConfig);
+        Map<String, Object> adminConfig = new HashMap<>();
+        adminConfig.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, 100);
+
+        Map<String, Object> props = createValidConfigProps(commonClientConfig, 
producerConfig, consumerConfig, adminConfig);
 
         // Check for topic properties
         TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new 
TopicBasedRemoteLogMetadataManagerConfig(props);
@@ -70,11 +75,13 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
         assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
         assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
         assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+        assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.adminProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
 
         for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) {
             assertEquals(entry.getValue(), 
rlmmConfig.commonProperties().get(entry.getKey()));
             assertEquals(entry.getValue(), 
rlmmConfig.producerProperties().get(entry.getKey()));
             assertEquals(entry.getValue(), 
rlmmConfig.consumerProperties().get(entry.getKey()));
+            assertEquals(entry.getValue(), 
rlmmConfig.adminProperties().get(entry.getKey()));
         }
         // Check for producer configs.
         for (Map.Entry<String, Object> entry : producerConfig.entrySet()) {
@@ -84,10 +91,14 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
         for (Map.Entry<String, Object> entry : consumerConfig.entrySet()) {
             assertEquals(entry.getValue(), 
rlmmConfig.consumerProperties().get(entry.getKey()));
         }
+        // Check for admin configs.
+        for (Map.Entry<String, Object> entry : adminConfig.entrySet()) {
+            assertEquals(entry.getValue(), 
rlmmConfig.adminProperties().get(entry.getKey()));
+        }
     }
 
     @Test
-    public void testCommonProducerConsumerOverridesConfig() {
+    public void testCommonClientOverridesConfig() {
         Map.Entry<String, Long> overrideEntry =
                 new 
AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 
60000L);
         Map<String, Object> commonClientConfig = new HashMap<>();
@@ -106,12 +117,17 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest 
{
         Long overriddenConsumerPropValue = overrideEntry.getValue() * 3;
         consumerConfig.put(overrideEntry.getKey(), 
overriddenConsumerPropValue);
 
-        Map<String, Object> props = createValidConfigProps(commonClientConfig, 
producerConfig, consumerConfig);
+        Map<String, Object> adminConfig = new HashMap<>();
+        Long overriddenAdminPropValue = overrideEntry.getValue() * 4;
+        adminConfig.put(overrideEntry.getKey(), overriddenAdminPropValue);
+
+        Map<String, Object> props = createValidConfigProps(commonClientConfig, 
producerConfig, consumerConfig, adminConfig);
         TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new 
TopicBasedRemoteLogMetadataManagerConfig(props);
 
         assertEquals(overrideCommonPropValue, 
rlmmConfig.commonProperties().get(overrideEntry.getKey()));
         assertEquals(overriddenProducerPropValue, 
rlmmConfig.producerProperties().get(overrideEntry.getKey()));
         assertEquals(overriddenConsumerPropValue, 
rlmmConfig.consumerProperties().get(overrideEntry.getKey()));
+        assertEquals(overriddenAdminPropValue, 
rlmmConfig.adminProperties().get(overrideEntry.getKey()));
     }
 
     @Test
@@ -130,7 +146,11 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
         consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         addPasswordTypeConfigurationProperties(consumerConfig);
 
-        Map<String, Object> props = createValidConfigProps(commonClientConfig, 
producerConfig, consumerConfig);
+        Map<String, Object> adminConfig = new HashMap<>();
+        adminConfig.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, 100);
+        addPasswordTypeConfigurationProperties(adminConfig);
+
+        Map<String, Object> props = createValidConfigProps(commonClientConfig, 
producerConfig, consumerConfig, adminConfig);
 
         // Check for topic properties
         TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new 
TopicBasedRemoteLogMetadataManagerConfig(props);
@@ -141,15 +161,17 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest 
{
         assertTrue(configString.contains("retries=10"));
         assertTrue(configString.contains("acks=\"all\""));
         assertTrue(configString.contains("enable.auto.commit=false"));
+        assertTrue(configString.contains("reconnect.backoff.ms=100"));
     }
 
     private Map<String, Object> createValidConfigProps() {
-        return this.createValidConfigProps(Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap());
+        return this.createValidConfigProps(Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
     }
 
     private Map<String, Object> createValidConfigProps(Map<String, Object> 
commonClientConfig,
                                                        Map<String, Object> 
producerConfig,
-                                                       Map<String, Object> 
consumerConfig) {
+                                                       Map<String, Object> 
consumerConfig,
+                                                       Map<String, Object> 
adminConfig) {
         Map<String, Object> props = new HashMap<>();
         props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
         props.put(BROKER_ID, 1);
@@ -170,6 +192,10 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
         for (Map.Entry<String, Object> entry : consumerConfig.entrySet()) {
             props.put(REMOTE_LOG_METADATA_CONSUMER_PREFIX + entry.getKey(), 
entry.getValue());
         }
+        // admin configs
+        for (Map.Entry<String, Object> entry : adminConfig.entrySet()) {
+            props.put(REMOTE_LOG_METADATA_ADMIN_PREFIX + entry.getKey(), 
entry.getValue());
+        }
         return props;
     }
 

Reply via email to