This is an automated email from the ASF dual-hosted git repository.
squah-confluent pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a7dfb89db47 KAFKA-20409: Don't expose internal group configs unless
they are user-defined (#22302)
a7dfb89db47 is described below
commit a7dfb89db47fb7ec3503981057fab54f8c078d15
Author: majialong <[email protected]>
AuthorDate: Thu May 21 15:04:25 2026 +0800
KAFKA-20409: Don't expose internal group configs unless they are
user-defined (#22302)
Previously, `internal group configs` with a `broker-level synonym` were
always included in the group config map, exposing them even when the
user had never explicitly set them. This change skips such configs
unless the user has configured them either via the broker synonym or
directly at the group level.
Reviewers: Sean Quah <[email protected]>
---
.../kafka/coordinator/group/GroupConfig.java | 9 ++++
.../kafka/server/config/AbstractKafkaConfig.java | 17 +++++-
.../server/config/AbstractKafkaConfigTest.java | 60 ++++++++++++++++++++++
3 files changed, 84 insertions(+), 2 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index f74e7a0a5d3..100bbc0c9b8 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -343,6 +343,15 @@ public final class GroupConfig extends AbstractConfig {
Map.entry(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG,
Optional.empty())
);
+ /**
+ * Returns {@code true} if the given config name is defined as internal in
{@link #CONFIG_DEF}.
+ * Returns {@code false} for unknown names or non-internal configs.
+ */
+ public static boolean isInternal(String configName) {
+ ConfigDef.ConfigKey configKey =
CONFIG_DEF.configKeys().get(configName);
+ return configKey != null && configKey.internalConfig;
+ }
+
/**
* Returns the broker-level synonym config name for the given group config
name,
* or {@code Optional.empty()} if no broker-level synonym exists.
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index ac5e33dc70b..f324c9c1974 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -632,10 +632,23 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
return millis < 0 ? Long.valueOf(-1) : millis;
}
+ /**
+ * Returns a map of group config names to their broker-level synonym
values, used as
+ * defaults when building a {@link GroupConfig} for {@code
DescribeConfigs}.
+ * Internal group configs are excluded unless their broker synonym was
explicitly configured.
+ *
+ * @return a map of group config names to their corresponding broker-level
values
+ */
public Map<String, Object> extractGroupConfigMap() {
Map<String, Object> defaults = new HashMap<>();
- GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.forEach((groupConfigName,
brokerConfigName) ->
- brokerConfigName.ifPresent(name -> defaults.put(groupConfigName,
get(name)))
+ Map<String, Object> brokerOriginals = originals();
+ GroupConfig.configNames().forEach(groupConfigName ->
+
GroupConfig.brokerSynonym(groupConfigName).ifPresent(brokerConfigName -> {
+ // Skip internal configs unless they are explicitly configured
via the broker synonym.
+ if (!GroupConfig.isInternal(groupConfigName) ||
brokerOriginals.containsKey(brokerConfigName)) {
+ defaults.put(groupConfigName, get(brokerConfigName));
+ }
+ })
);
return defaults;
}
diff --git
a/server/src/test/java/org/apache/kafka/server/config/AbstractKafkaConfigTest.java
b/server/src/test/java/org/apache/kafka/server/config/AbstractKafkaConfigTest.java
index 3600d80253b..d7243c6192e 100644
---
a/server/src/test/java/org/apache/kafka/server/config/AbstractKafkaConfigTest.java
+++
b/server/src/test/java/org/apache/kafka/server/config/AbstractKafkaConfigTest.java
@@ -16,18 +16,31 @@
*/
package org.apache.kafka.server.config;
+import org.apache.kafka.common.Reconfigurable;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.raft.KRaftConfigs;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mockStatic;
public class AbstractKafkaConfigTest {
+ private static final String TEST_INTERNAL_GROUP_CONFIG =
"group.test.internal.config";
+ private static final String TEST_INTERNAL_GROUP_CONFIG_BROKER_SYNONYM =
"group.test.internal.config.broker.synonym";
+
@Test
public void testPopulateSynonymsOnEmptyMap() {
assertEquals(Collections.emptyMap(),
AbstractKafkaConfig.populateSynonyms(Collections.emptyMap()));
@@ -52,4 +65,51 @@ public class AbstractKafkaConfigTest {
expectedOutput.put(KRaftConfigs.NODE_ID_CONFIG, "4");
assertEquals(expectedOutput,
AbstractKafkaConfig.populateSynonyms(input));
}
+
+ @Test
+ public void
testExtractGroupConfigMapExcludesInternalConfigWithUnconfiguredBrokerSynonym() {
+ Map<String, Object> config = extractGroupConfigMap(Map.of(), true);
+
+ assertFalse(config.containsKey(TEST_INTERNAL_GROUP_CONFIG));
+ }
+
+ @Test
+ public void
testExtractGroupConfigMapIncludesInternalConfigWithConfiguredBrokerSynonym() {
+ Map<String, Object> config =
extractGroupConfigMap(Map.of(TEST_INTERNAL_GROUP_CONFIG_BROKER_SYNONYM,
"override-value"), true);
+
+ assertTrue(config.containsKey(TEST_INTERNAL_GROUP_CONFIG));
+ assertEquals("override-value", config.get(TEST_INTERNAL_GROUP_CONFIG));
+ }
+
+ @Test
+ public void testExtractGroupConfigMapIncludesNonInternalConfig() {
+ Map<String, Object> config = extractGroupConfigMap(Map.of(), false);
+
+ assertTrue(config.containsKey(TEST_INTERNAL_GROUP_CONFIG));
+ assertEquals("default-value", config.get(TEST_INTERNAL_GROUP_CONFIG));
+ }
+
+ private static Map<String, Object> extractGroupConfigMap(Map<String,
Object> brokerProps, boolean isInternal) {
+ try (MockedStatic<GroupConfig> mocked = mockStatic(GroupConfig.class,
Mockito.CALLS_REAL_METHODS)) {
+
+ // mock group config
+
mocked.when(GroupConfig::configNames).thenReturn(Set.of(TEST_INTERNAL_GROUP_CONFIG));
+ mocked.when(() ->
GroupConfig.brokerSynonym(TEST_INTERNAL_GROUP_CONFIG)).thenReturn(Optional.of(TEST_INTERNAL_GROUP_CONFIG_BROKER_SYNONYM));
+ mocked.when(() ->
GroupConfig.isInternal(TEST_INTERNAL_GROUP_CONFIG)).thenReturn(isInternal);
+
+ // mock broker synonym config
+ ConfigDef configDef = new
ConfigDef().define(TEST_INTERNAL_GROUP_CONFIG_BROKER_SYNONYM,
ConfigDef.Type.STRING,
+ "default-value", ConfigDef.Importance.LOW, "test broker
synonym");
+
+ AbstractKafkaConfig kafkaConfig = new
AbstractKafkaConfig(configDef, new HashMap<>(brokerProps), Map.of(), false) {
+ @Override
+ public void addReconfigurable(Reconfigurable reconfigurable) {
}
+
+ @Override
+ public void removeReconfigurable(Reconfigurable
reconfigurable) { }
+ };
+
+ return kafkaConfig.extractGroupConfigMap();
+ }
+ }
}