This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dbc7787fcd7 KAFKA-19590 Add prefix to
TopicBasedRemoteLogMetadataManagerConfig to enable setting admin configs
(#21277)
dbc7787fcd7 is described below
commit dbc7787fcd760373f370d84b156383bc0a117ebe
Author: Lan Ding <[email protected]>
AuthorDate: Tue Jan 13 22:49:28 2026 +0800
KAFKA-19590 Add prefix to TopicBasedRemoteLogMetadataManagerConfig to
enable setting admin configs (#21277)
Currently, `TopicBasedRemoteLogMetadataManager` creates the Admin client
using `commonProperties()`, which prevents users from setting
admin-specific configurations (e.g., custom timeouts, retry policies).
This commit adds a new configuration prefix
`remote.log.metadata.admin.*` to allow separate configuration for the
Admin client, similar to the existing producer and consumer prefixes.
Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
docs/configuration/tiered-storage-configs.md | 3 ++
docs/getting-started/upgrade.md | 2 +-
.../TopicBasedRemoteLogMetadataManager.java | 2 +-
.../TopicBasedRemoteLogMetadataManagerConfig.java | 25 ++++++++++++--
...picBasedRemoteLogMetadataManagerConfigTest.java | 38 ++++++++++++++++++----
5 files changed, 59 insertions(+), 11 deletions(-)
diff --git a/docs/configuration/tiered-storage-configs.md
b/docs/configuration/tiered-storage-configs.md
index ad1e6f43d06..ac6041b660b 100644
--- a/docs/configuration/tiered-storage-configs.md
+++ b/docs/configuration/tiered-storage-configs.md
@@ -43,6 +43,9 @@ Additional configurations can be provided for different types
of clients using t
# Configs for admin, producer, and consumer clients
<rlmm.prefix>.remote.log.metadata.common.client.<kafka.property> = <value>
+
+ # Configs only for admin client
+ <rlmm.prefix>.remote.log.metadata.admin.<kafka.property> = <value>
# Configs only for producer client
<rlmm.prefix>.remote.log.metadata.producer.<kafka.property> = <value>
diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md
index ee9a890a05c..24ced55efcb 100644
--- a/docs/getting-started/upgrade.md
+++ b/docs/getting-started/upgrade.md
@@ -35,7 +35,7 @@ type: docs
* Support dynamically changing configs for dynamic quorum controllers.
Previously only brokers and static quorum controllers were supported. For
further details, please refer to
[KAFKA-18928](https://issues.apache.org/jira/browse/KAFKA-18928).
* Two new configs have been introduced:
`group.coordinator.cached.buffer.max.bytes` and
`share.coordinator.cached.buffer.max.bytes`. They allow the respective
coordinators to set the maximum buffer size retained for reuse. For further
details, please refer to
[KIP-1196](https://cwiki.apache.org/confluence/x/hA5JFg).
* The new config have been introduced: `remote.log.metadata.topic.min.isr`
with 2 as default value. You can correct the min.insync.replicas for the
existed __remote_log_metadata topic via kafka-configs.sh if needed. For further
details, please refer to
[KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).
-
+ * The new config prefix `remote.log.metadata.admin.` has been introduced. It
allows independent configuration of the admin client used by
`TopicBasedRemoteLogMetadataManager`. For further details, please refer to
[KIP-1208](https://cwiki.apache.org/confluence/x/vYqhFg).
## Upgrading to 4.2.0
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index cd2ed674216..00f20fb4f14 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -323,7 +323,7 @@ public class TopicBasedRemoteLogMetadataManager implements
BrokerReadyCallback,
boolean isTopicCreated = false;
long startTimeMs = time.milliseconds();
boolean initializationFailed = false;
- try (Admin admin = Admin.create(rlmmConfig.commonProperties())) {
+ try (Admin admin = Admin.create(rlmmConfig.adminProperties())) {
while (!(initialized.get() || closing.get() ||
initializationFailed)) {
if (time.milliseconds() - startTimeMs > retryMaxTimeoutMs) {
log.error("Timed out to initialize the resources within {}
ms.", retryMaxTimeoutMs);
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
index 52f934ce299..29f117cd54e 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
@@ -80,6 +80,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
public static final String REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX =
"remote.log.metadata.common.client.";
public static final String REMOTE_LOG_METADATA_PRODUCER_PREFIX =
"remote.log.metadata.producer.";
public static final String REMOTE_LOG_METADATA_CONSUMER_PREFIX =
"remote.log.metadata.consumer.";
+ public static final String REMOTE_LOG_METADATA_ADMIN_PREFIX =
"remote.log.metadata.admin.";
public static final String BROKER_ID = "broker.id";
public static final String LOG_DIR = "log.dir";
@@ -118,6 +119,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig
{
private Map<String, Object> commonProps;
private Map<String, Object> consumerProps;
private Map<String, Object> producerProps;
+ private Map<String, Object> adminProps;
public TopicBasedRemoteLogMetadataManagerConfig(Map<String, ?> props) {
Objects.requireNonNull(props, "props can not be null");
@@ -137,13 +139,14 @@ public final class
TopicBasedRemoteLogMetadataManagerConfig {
initializationRetryIntervalMs = (long)
parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP);
initializationRetryMaxTimeoutMs = (long)
parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP);
clientIdPrefix = REMOTE_LOG_METADATA_CLIENT_PREFIX + "_" +
props.get(BROKER_ID);
- initializeProducerConsumerProperties(props);
+ initializeClientProperties(props);
}
- private void initializeProducerConsumerProperties(Map<String, ?> configs) {
+ private void initializeClientProperties(Map<String, ?> configs) {
Map<String, Object> commonClientConfigs = new HashMap<>();
Map<String, Object> producerOnlyConfigs = new HashMap<>();
Map<String, Object> consumerOnlyConfigs = new HashMap<>();
+ Map<String, Object> adminOnlyConfigs = new HashMap<>();
for (Map.Entry<String, ?> entry : configs.entrySet()) {
String key = entry.getKey();
if (key.startsWith(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX)) {
@@ -152,6 +155,8 @@ public final class TopicBasedRemoteLogMetadataManagerConfig
{
producerOnlyConfigs.put(key.substring(REMOTE_LOG_METADATA_PRODUCER_PREFIX.length()),
entry.getValue());
} else if (key.startsWith(REMOTE_LOG_METADATA_CONSUMER_PREFIX)) {
consumerOnlyConfigs.put(key.substring(REMOTE_LOG_METADATA_CONSUMER_PREFIX.length()),
entry.getValue());
+ } else if (key.startsWith(REMOTE_LOG_METADATA_ADMIN_PREFIX)) {
+
adminOnlyConfigs.put(key.substring(REMOTE_LOG_METADATA_ADMIN_PREFIX.length()),
entry.getValue());
}
}
commonProps = new HashMap<>(commonClientConfigs);
@@ -161,6 +166,9 @@ public final class TopicBasedRemoteLogMetadataManagerConfig
{
Map<String, Object> allConsumerConfigs = new
HashMap<>(commonClientConfigs);
allConsumerConfigs.putAll(consumerOnlyConfigs);
consumerProps = createConsumerProps(allConsumerConfigs);
+ Map<String, Object> allAdminConfigs = new
HashMap<>(commonClientConfigs);
+ allAdminConfigs.putAll(adminOnlyConfigs);
+ adminProps = createAdminProps(allAdminConfigs);
}
public String remoteLogMetadataTopicName() {
@@ -199,6 +207,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig
{
return logDir;
}
+ // Used for testing
public Map<String, Object> commonProperties() {
return commonProps;
}
@@ -211,6 +220,10 @@ public final class
TopicBasedRemoteLogMetadataManagerConfig {
return producerProps;
}
+ public Map<String, Object> adminProperties() {
+ return adminProps;
+ }
+
private Map<String, Object> createConsumerProps(Map<String, Object>
allConsumerConfigs) {
Map<String, Object> props = new HashMap<>(allConsumerConfigs);
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientIdPrefix +
"_consumer");
@@ -232,6 +245,12 @@ public final class
TopicBasedRemoteLogMetadataManagerConfig {
return Collections.unmodifiableMap(props);
}
+ private Map<String, Object> createAdminProps(Map<String, Object>
allAdminConfigs) {
+ Map<String, Object> props = new HashMap<>(allAdminConfigs);
+ props.put(AdminClientConfig.CLIENT_ID_CONFIG, clientIdPrefix +
"_admin");
+ return Collections.unmodifiableMap(props);
+ }
+
@Override
public String toString() {
return "TopicBasedRemoteLogMetadataManagerConfig{" +
@@ -243,9 +262,9 @@ public final class TopicBasedRemoteLogMetadataManagerConfig
{
", metadataTopicMinIsr=" + metadataTopicMinIsr +
", initializationRetryMaxTimeoutMs=" +
initializationRetryMaxTimeoutMs +
", initializationRetryIntervalMs=" +
initializationRetryIntervalMs +
- ", commonProps=" + configMapToRedactedString(commonProps,
AdminClientConfig.configDef()) +
", consumerProps=" + configMapToRedactedString(consumerProps,
ConsumerConfig.configDef()) +
", producerProps=" + configMapToRedactedString(producerProps,
ProducerConfig.configDef()) +
+ ", adminProps=" + configMapToRedactedString(adminProps,
AdminClientConfig.configDef()) +
'}';
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
index e3eb05b72b9..bad522ff51e 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
@@ -34,6 +35,7 @@ import java.util.Map;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
+import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_ADMIN_PREFIX;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
@@ -60,7 +62,10 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- Map<String, Object> props = createValidConfigProps(commonClientConfig,
producerConfig, consumerConfig);
+ Map<String, Object> adminConfig = new HashMap<>();
+ adminConfig.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, 100);
+
+ Map<String, Object> props = createValidConfigProps(commonClientConfig,
producerConfig, consumerConfig, adminConfig);
// Check for topic properties
TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new
TopicBasedRemoteLogMetadataManagerConfig(props);
@@ -70,11 +75,13 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
assertEquals(BOOTSTRAP_SERVERS,
rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
assertEquals(BOOTSTRAP_SERVERS,
rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
assertEquals(BOOTSTRAP_SERVERS,
rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+ assertEquals(BOOTSTRAP_SERVERS,
rlmmConfig.adminProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) {
assertEquals(entry.getValue(),
rlmmConfig.commonProperties().get(entry.getKey()));
assertEquals(entry.getValue(),
rlmmConfig.producerProperties().get(entry.getKey()));
assertEquals(entry.getValue(),
rlmmConfig.consumerProperties().get(entry.getKey()));
+ assertEquals(entry.getValue(),
rlmmConfig.adminProperties().get(entry.getKey()));
}
// Check for producer configs.
for (Map.Entry<String, Object> entry : producerConfig.entrySet()) {
@@ -84,10 +91,14 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
for (Map.Entry<String, Object> entry : consumerConfig.entrySet()) {
assertEquals(entry.getValue(),
rlmmConfig.consumerProperties().get(entry.getKey()));
}
+ // Check for admin configs.
+ for (Map.Entry<String, Object> entry : adminConfig.entrySet()) {
+ assertEquals(entry.getValue(),
rlmmConfig.adminProperties().get(entry.getKey()));
+ }
}
@Test
- public void testCommonProducerConsumerOverridesConfig() {
+ public void testCommonClientOverridesConfig() {
Map.Entry<String, Long> overrideEntry =
new
AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
60000L);
Map<String, Object> commonClientConfig = new HashMap<>();
@@ -106,12 +117,17 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest
{
Long overriddenConsumerPropValue = overrideEntry.getValue() * 3;
consumerConfig.put(overrideEntry.getKey(),
overriddenConsumerPropValue);
- Map<String, Object> props = createValidConfigProps(commonClientConfig,
producerConfig, consumerConfig);
+ Map<String, Object> adminConfig = new HashMap<>();
+ Long overriddenAdminPropValue = overrideEntry.getValue() * 4;
+ adminConfig.put(overrideEntry.getKey(), overriddenAdminPropValue);
+
+ Map<String, Object> props = createValidConfigProps(commonClientConfig,
producerConfig, consumerConfig, adminConfig);
TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new
TopicBasedRemoteLogMetadataManagerConfig(props);
assertEquals(overrideCommonPropValue,
rlmmConfig.commonProperties().get(overrideEntry.getKey()));
assertEquals(overriddenProducerPropValue,
rlmmConfig.producerProperties().get(overrideEntry.getKey()));
assertEquals(overriddenConsumerPropValue,
rlmmConfig.consumerProperties().get(overrideEntry.getKey()));
+ assertEquals(overriddenAdminPropValue,
rlmmConfig.adminProperties().get(overrideEntry.getKey()));
}
@Test
@@ -130,7 +146,11 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
addPasswordTypeConfigurationProperties(consumerConfig);
- Map<String, Object> props = createValidConfigProps(commonClientConfig,
producerConfig, consumerConfig);
+ Map<String, Object> adminConfig = new HashMap<>();
+ adminConfig.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, 100);
+ addPasswordTypeConfigurationProperties(adminConfig);
+
+ Map<String, Object> props = createValidConfigProps(commonClientConfig,
producerConfig, consumerConfig, adminConfig);
// Check for topic properties
TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new
TopicBasedRemoteLogMetadataManagerConfig(props);
@@ -141,15 +161,17 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest
{
assertTrue(configString.contains("retries=10"));
assertTrue(configString.contains("acks=\"all\""));
assertTrue(configString.contains("enable.auto.commit=false"));
+ assertTrue(configString.contains("reconnect.backoff.ms=100"));
}
private Map<String, Object> createValidConfigProps() {
- return this.createValidConfigProps(Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap());
+ return this.createValidConfigProps(Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
}
private Map<String, Object> createValidConfigProps(Map<String, Object>
commonClientConfig,
Map<String, Object>
producerConfig,
- Map<String, Object>
consumerConfig) {
+ Map<String, Object>
consumerConfig,
+ Map<String, Object>
adminConfig) {
Map<String, Object> props = new HashMap<>();
props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX +
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(BROKER_ID, 1);
@@ -170,6 +192,10 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
for (Map.Entry<String, Object> entry : consumerConfig.entrySet()) {
props.put(REMOTE_LOG_METADATA_CONSUMER_PREFIX + entry.getKey(),
entry.getValue());
}
+ // admin configs
+ for (Map.Entry<String, Object> entry : adminConfig.entrySet()) {
+ props.put(REMOTE_LOG_METADATA_ADMIN_PREFIX + entry.getKey(),
entry.getValue());
+ }
return props;
}