This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e36319010c2 KAFKA-20162: Validation and enforcement of group
configurations (#21633)
e36319010c2 is described below
commit e36319010c21e5ef454062d0f2b85634c158fa21
Author: majialong <[email protected]>
AuthorDate: Wed Mar 11 03:27:51 2026 +0800
KAFKA-20162: Validation and enforcement of group configurations (#21633)
Implement KAFKA-20162 (KIP-1240) by adding broker-level bound evaluation
for dynamic group configs. Out-of-range values are capped to broker
limits with WARN logs, while Admin validation remains strict and
metadata-stored raw values are preserved.
Reviewers: Sean Quah <[email protected]>, Andrew Schofield
<[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
.../server/ControllerConfigurationValidator.scala | 4 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 46 ++++
.../kafka/coordinator/group/GroupConfig.java | 184 ++++++++++++++-
.../coordinator/group/GroupConfigManager.java | 48 ++--
.../coordinator/group/GroupCoordinatorConfig.java | 18 +-
.../group/modern/share/ShareGroupConfig.java | 28 +--
.../coordinator/group/GroupConfigManagerTest.java | 76 ++++---
.../kafka/coordinator/group/GroupConfigTest.java | 183 ++++++++++++++-
.../group/GroupMetadataManagerTest.java | 247 +++++++++++++++++++++
.../group/modern/share/ShareGroupConfigTest.java | 6 +-
.../GroupCoordinatorShardLoadingBenchmark.java | 5 +-
12 files changed, 752 insertions(+), 95 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 698f8857881..8493321641f 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -374,7 +374,7 @@ class BrokerServer(
authorizerPlugin = config.createNewAuthorizer(metrics,
ProcessRole.BrokerRole.toString)
/* initializing the groupConfigManager */
- groupConfigManager = new
GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig))
+ groupConfigManager = new
GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig),
config.groupCoordinatorConfig, config.shareGroupConfig)
/* create share coordinator */
shareCoordinator = createShareCoordinator()
diff --git
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index f163a2739ae..89fdb1d4242 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -24,7 +24,7 @@ import
org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRIC
import org.apache.kafka.controller.ConfigurationValidator
import org.apache.kafka.common.errors.{InvalidConfigurationException,
InvalidRequestException}
import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.coordinator.group.GroupConfigManager
+import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.storage.internals.log.LogConfig
@@ -139,7 +139,7 @@ class ControllerConfigurationValidator(kafkaConfig:
KafkaConfig) extends Configu
throw new InvalidConfigurationException("Null value not supported
for group configs: " +
nullGroupConfigs.mkString(","))
}
- GroupConfigManager.validate(properties,
kafkaConfig.groupCoordinatorConfig, kafkaConfig.shareGroupConfig)
+ GroupConfig.validate(properties, kafkaConfig.groupCoordinatorConfig,
kafkaConfig.shareGroupConfig)
case _ => throwExceptionForUnknownResourceType(resource)
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 51be2b7b9f2..327b5ad2b36 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1157,6 +1157,52 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
"consumer.session.timeout.ms must be greater than or equal to
group.consumer.min.session.timeout.ms")
}
+ @Test
+ def testGroupConfigEvaluatedAfterBrokerRestart(): Unit = {
+ client = createAdminClient
+ val groupId = "evaluated-config-test-group"
+ val groupResource = new ConfigResource(ConfigResource.Type.GROUP, groupId)
+
+ // Set a valid group config (55000 is within default [45000, 60000])
+ val alterOps = util.List.of(
+ new AlterConfigOp(new
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "55000"),
AlterConfigOp.OpType.SET)
+ )
+ val alterResult =
client.incrementalAlterConfigs(util.Map.of(groupResource, alterOps))
+ alterResult.all.get(15, TimeUnit.SECONDS)
+ ensureConsistentKRaftMetadata()
+
+ // Verify stored value and effective value before restart
+ var describeResult = client.describeConfigs(util.List.of(groupResource))
+ var configs = describeResult.all.get(15, TimeUnit.SECONDS)
+ assertEquals("55000",
configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).value)
+ // Before restart, 55000 is within [45000, 60000], so no adjustment needed
+ assertEquals(55000,
brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs)
+
+ // Kill all brokers
+ client.close()
+ for (i <- 0 until brokerCount) {
+ killBroker(i)
+ }
+
+ // Change broker-level max to 50000 (making stored 55000 exceed the new
max)
+
serverConfig.setProperty(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
"50000")
+
+ // Restart brokers with new config (should not block startup)
+ restartDeadBrokers(reconfigure = true)
+ client = createAdminClient
+ ensureConsistentKRaftMetadata()
+
+ // Verify stored value is preserved (describeConfigs returns raw value)
+ describeResult = client.describeConfigs(util.List.of(groupResource))
+ configs = describeResult.all.get(15, TimeUnit.SECONDS)
+ assertEquals("55000",
configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).value)
+ assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG,
+
configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).source)
+
+ // Verify effective value is adjusted (55000 evaluated to new max 50000)
+ assertEquals(50000,
brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs)
+ }
+
@Test
def testCreatePartitions(): Unit = {
client = createAdminClient
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 88cd51a6750..f17ee7fe61d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -24,6 +24,9 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
@@ -43,6 +46,8 @@ import static
org.apache.kafka.common.config.ConfigDef.ValidString.in;
*/
public final class GroupConfig extends AbstractConfig {
+ private static final Logger log =
LoggerFactory.getLogger(GroupConfig.class);
+
public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG =
"consumer.session.timeout.ms";
public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG =
"consumer.heartbeat.interval.ms";
@@ -298,11 +303,11 @@ public final class GroupConfig extends AbstractConfig {
}
if (shareDeliveryCountLimit <
shareGroupConfig.shareGroupMinDeliveryCountLimit()) {
throw new
InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be
greater than or equal to " +
-
ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG);
+ ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG);
}
if (shareDeliveryCountLimit >
shareGroupConfig.shareGroupMaxDeliveryCountLimit()) {
throw new
InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be
less than or equal to " +
-
ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG);
+ ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG);
}
if (sharePartitionMaxRecordLocks <
shareGroupConfig.shareGroupMinPartitionMaxRecordLocks()) {
throw new
InvalidConfigurationException(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG + " must
be greater than or equal to " +
@@ -347,15 +352,182 @@ public final class GroupConfig extends AbstractConfig {
}
/**
- * Check that the given properties contain only valid consumer group
config names and that all values can be
- * parsed and are valid.
+ * Check that the given properties contain only valid group config names
and that
+ * all values can be parsed and are valid. The provided properties are
merged with
+ * the broker-level defaults before validation.
*/
public static void validate(Properties props, GroupCoordinatorConfig
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
- validateNames(props);
- Map<?, ?> valueMaps = CONFIG.parse(props);
+ Properties combinedConfigs = new Properties();
+
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
+ combinedConfigs.putAll(props);
+
+ validateNames(combinedConfigs);
+ Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
validateValues(valueMaps, groupCoordinatorConfig, shareGroupConfig);
}
+ /**
+ * Evaluate group config values to their effective values within
broker-level bounds.
+ * Out-of-range values are capped and a WARN log is emitted.
+ *
+ * @param props The raw group config properties.
+ * @param groupId The group id.
+ * @param groupCoordinatorConfig The group coordinator config.
+ * @param shareGroupConfig The share group config.
+ * @return A new Properties with out-of-range values capped.
+ */
+ public static Properties evaluate(
+ Properties props,
+ String groupId,
+ GroupCoordinatorConfig groupCoordinatorConfig,
+ ShareGroupConfig shareGroupConfig
+ ) {
+ Properties effective = new Properties();
+ effective.putAll(props);
+ evaluateValues(effective, groupId, groupCoordinatorConfig,
shareGroupConfig);
+ return effective;
+ }
+
+ private static void evaluateValues(
+ Properties props,
+ String groupId,
+ GroupCoordinatorConfig groupCoordinatorConfig,
+ ShareGroupConfig shareGroupConfig
+ ) {
+ // Consumer group configs
+ clampToRange(props, groupId, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(),
+ groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs());
+ clampToRange(props, groupId, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
+ groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs());
+
+ // Share group configs
+ clampToRange(props, groupId, SHARE_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(),
+ groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs());
+ clampToRange(props, groupId, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(),
+ groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs());
+ clampToRange(props, groupId, SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
+ shareGroupConfig.shareGroupMinRecordLockDurationMs(),
+ shareGroupConfig.shareGroupMaxRecordLockDurationMs());
+ clampToRange(props, groupId, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+ shareGroupConfig.shareGroupMinDeliveryCountLimit(),
+ shareGroupConfig.shareGroupMaxDeliveryCountLimit());
+ clampToRange(props, groupId, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+ shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
+ shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks());
+
+ // Streams group configs
+ clampToRange(props, groupId, STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(),
+ groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs());
+ clampToRange(props, groupId, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(),
+ groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs());
+ clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas());
+
+ // Verify that clamping did not break the session > heartbeat
invariant.
+ checkSessionExceedsHeartbeat(props, groupId,
+ CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.consumerGroupSessionTimeoutMs(),
+ CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs());
+ checkSessionExceedsHeartbeat(props, groupId,
+ SHARE_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
+ SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.shareGroupHeartbeatIntervalMs());
+ checkSessionExceedsHeartbeat(props, groupId,
+ STREAMS_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.streamsGroupSessionTimeoutMs(),
+ STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs());
+ }
+
+ /**
+ * Log a WARN if the session timeout is not greater than the heartbeat
interval after
+ * evaluation. When a key is absent from props, the broker-level default
is used.
+ */
+ private static void checkSessionExceedsHeartbeat(
+ Properties props,
+ String groupId,
+ String sessionKey,
+ int defaultSession,
+ String heartbeatKey,
+ int defaultHeartbeat
+ ) {
+ Object rawSession = props.get(sessionKey);
+ Object rawHeartbeat = props.get(heartbeatKey);
+ if (rawSession == null && rawHeartbeat == null) return;
+
+ int session = rawSession != null ?
Integer.parseInt(rawSession.toString()) : defaultSession;
+ int heartbeat = rawHeartbeat != null ?
Integer.parseInt(rawHeartbeat.toString()) : defaultHeartbeat;
+ if (session <= heartbeat) {
+ log.warn("The effective {} ({}) for group '{}' is not greater than
{} ({}). "
+ + "Check that the broker-level min/max bounds for session
timeout "
+ + "and heartbeat interval do not overlap.",
+ sessionKey, session, groupId, heartbeatKey, heartbeat);
+ }
+ }
+
+ /**
+ * Clamp a config value to [min, max]. A WARN log is emitted on adjustment.
+ * No-op when the key is absent from props.
+ *
+ * @param props The properties to modify in place.
+ * @param groupId The group id.
+ * @param key The config key.
+ * @param min The minimum allowed value (inclusive).
+ * @param max The maximum allowed value (inclusive).
+ */
+ private static void clampToRange(
+ Properties props,
+ String groupId,
+ String key,
+ int min,
+ int max
+ ) {
+ Object rawValue = props.get(key);
+ if (rawValue == null) return;
+
+ int value = Integer.parseInt(rawValue.toString());
+ if (value < min) {
+ log.warn("The group config '{}' for group '{}' has value {} which
is below the broker's " +
+ "allowed minimum {}. The effective value will be capped to
{}.",
+ key, groupId, value, min, min);
+ props.put(key, min);
+ } else if (value > max) {
+ log.warn("The group config '{}' for group '{}' has value {} which
exceeds the broker's " +
+ "allowed maximum {}. The effective value will be capped to
{}.",
+ key, groupId, value, max, max);
+ props.put(key, max);
+ }
+ }
+
+ /**
+ * Clamp a config value to at most max. A WARN log is emitted on
adjustment.
+ * No-op when the key is absent from props.
+ *
+ * @param props The properties to modify in place.
+ * @param groupId The group id.
+ * @param key The config key.
+ * @param max The maximum allowed value (inclusive).
+ */
+ private static void clampToMax(
+ Properties props,
+ String groupId,
+ String key,
+ int max
+ ) {
+ Object rawValue = props.get(key);
+ if (rawValue == null) return;
+
+ int value = Integer.parseInt(rawValue.toString());
+ if (value > max) {
+ log.warn("The group config '{}' for group '{}' has value {} which
exceeds the broker's " +
+ "allowed maximum {}. The effective value will be capped to
{}.",
+ key, groupId, value, max, max);
+ props.put(key, max);
+ }
+ }
+
/**
* Create a group config instance using the given properties and defaults.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
index 892dc9272ef..05ebecfc99c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
@@ -17,12 +17,12 @@
package org.apache.kafka.coordinator.group;
-import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@@ -36,31 +36,50 @@ public class GroupConfigManager implements AutoCloseable {
private final Map<String, GroupConfig> configMap;
- public GroupConfigManager(Map<?, ?> defaultConfig) {
+ private final GroupCoordinatorConfig groupCoordinatorConfig;
+
+ private final ShareGroupConfig shareGroupConfig;
+
+ public GroupConfigManager(
+ Map<?, ?> defaultConfig,
+ GroupCoordinatorConfig groupCoordinatorConfig,
+ ShareGroupConfig shareGroupConfig
+ ) {
this.configMap = new ConcurrentHashMap<>();
this.defaultConfig = new GroupConfig(defaultConfig);
+ this.groupCoordinatorConfig =
Objects.requireNonNull(groupCoordinatorConfig);
+ this.shareGroupConfig = Objects.requireNonNull(shareGroupConfig);
}
/**
* Update the configuration of the provided group.
*
- * @param groupId The group id.
- * @param newGroupConfig The new group config.
+ * This method evaluates all configuration values within broker-level
bounds.
+ *
+ * @param groupId The group id.
+ * @param newGroupConfig The new group config.
*/
public void updateGroupConfig(String groupId, Properties newGroupConfig) {
if (null == groupId || groupId.isEmpty()) {
throw new InvalidRequestException("Group name can't be empty.");
}
+ // Evaluate ensures configs respect broker-level bounds. For the Admin
API path,
+ // values are pre-validated so this is effectively a no-op. For the
broker startup
+ // path, configs from metadata may need evaluation if bounds have
changed.
+ Properties evaluatedProps = GroupConfig.evaluate(
+ newGroupConfig, groupId, groupCoordinatorConfig, shareGroupConfig);
+
final GroupConfig newConfig = GroupConfig.fromProps(
defaultConfig.originals(),
- newGroupConfig
+ evaluatedProps
);
configMap.put(groupId, newConfig);
}
/**
* Get the group config if it exists, otherwise return None.
+ * The returned config has already been evaluated within broker-level
bounds.
*
* @param groupId The group id.
* @return The group config.
@@ -73,25 +92,6 @@ public class GroupConfigManager implements AutoCloseable {
return List.copyOf(configMap.keySet());
}
- /**
- * Validate the given properties.
- *
- * @param newGroupConfig The new group config.
- * @param groupCoordinatorConfig The group coordinator config.
- * @param shareGroupConfig The share group config.
- * @throws InvalidConfigurationException If validation fails.
- */
- public static void validate(
- Properties newGroupConfig,
- GroupCoordinatorConfig groupCoordinatorConfig,
- ShareGroupConfig shareGroupConfig
- ) {
- Properties combinedConfigs = new Properties();
-
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
- combinedConfigs.putAll(newGroupConfig);
- GroupConfig.validate(combinedConfigs, groupCoordinatorConfig,
shareGroupConfig);
- }
-
/**
* Remove all group configs.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 5a55748e334..cc808999bc0 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -666,7 +666,7 @@ public class GroupCoordinatorConfig {
public Map<String, Integer> extractGroupConfigMap(ShareGroupConfig
shareGroupConfig) {
Map<String, Integer> defaultConfigs = new HashMap<>();
defaultConfigs.putAll(extractConsumerGroupConfigMap());
-
defaultConfigs.putAll(shareGroupConfig.extractShareGroupConfigMap(this));
+ defaultConfigs.putAll(extractShareGroupConfigMap(shareGroupConfig));
defaultConfigs.putAll(extractStreamsGroupConfigMap());
return Collections.unmodifiableMap(defaultConfigs);
}
@@ -680,6 +680,22 @@ public class GroupCoordinatorConfig {
GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
consumerGroupHeartbeatIntervalMs());
}
+ /**
+ * Copy the subset of properties that are relevant to share group. These
configs include those which can be set
+ * statically (for all groups) or dynamically (for a specific group). In
those cases, the default value for the
+ * group specific dynamic config (Ex. share.session.timeout.ms) should be
the value set for the static config
+ * (Ex. group.share.session.timeout.ms).
+ */
+ public Map<String, Integer> extractShareGroupConfigMap(ShareGroupConfig
shareGroupConfig) {
+ return Map.of(
+ GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG,
this.shareGroupSessionTimeoutMs(),
+ GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
this.shareGroupHeartbeatIntervalMs(),
+ GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupConfig.shareGroupRecordLockDurationMs(),
+ GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
shareGroupConfig.shareGroupDeliveryCountLimit(),
+ GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
shareGroupConfig.shareGroupPartitionMaxRecordLocks()
+ );
+ }
+
/**
* Copy the subset of properties that are relevant to streams group.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index 0ddfbe624e4..5c993f420b2 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -19,9 +19,9 @@ package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import java.util.Arrays;
import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
@@ -137,6 +137,16 @@ public class ShareGroupConfig {
validate();
}
+ public static ShareGroupConfig fromProps(Map<?, ?> props) {
+ return new ShareGroupConfig(
+ new AbstractConfig(
+ Utils.mergeConfigs(Arrays.asList(CONFIG_DEF,
GroupCoordinatorConfig.CONFIG_DEF)),
+ props,
+ false
+ )
+ );
+ }
+
/** Share group configuration **/
public boolean isShareGroupEnabled() {
return isShareGroupEnabled;
@@ -213,20 +223,4 @@ public class ShareGroupConfig {
String.format("%s must be greater than or equal to %s",
SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG));
}
-
- /**
- * Copy the subset of properties that are relevant to share group. These
configs include those which can be set
- * statically (for all groups) or dynamically (for a specific group). In
those cases, the default value for the
- * group specific dynamic config (Ex. share.session.timeout.ms) should be
the value set for the static config
- * (Ex. group.share.session.timeout.ms).
- */
- public Map<String, Integer>
extractShareGroupConfigMap(GroupCoordinatorConfig groupCoordinatorConfig) {
- return Map.of(
- GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
- GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.shareGroupHeartbeatIntervalMs(),
- GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupRecordLockDurationMs(),
- GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
shareGroupDeliveryCountLimit(),
- GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
shareGroupPartitionMaxRecordLocks()
- );
- }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
index d50ac1301d0..f258514f2a3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
@@ -17,16 +17,13 @@
package org.apache.kafka.coordinator.group;
-import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.InvalidRequestException;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -34,8 +31,6 @@ import java.util.Properties;
import static
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
-import static
org.apache.kafka.coordinator.group.GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -69,6 +64,12 @@ public class GroupConfigManagerTest {
assertFalse(groupConfig.isPresent());
}
+ @Test
+ public void testUpdateConfigWithNullGroupId() {
+ assertThrows(InvalidRequestException.class,
+ () -> configManager.updateGroupConfig(null, new Properties()));
+ }
+
@Test
public void testUpdateGroupConfig() {
String groupId = "foo";
@@ -86,43 +87,44 @@ public class GroupConfigManagerTest {
}
@Test
- public void testValidateUsesAllGroupTypeDefaults() {
- Map<String, Object> configs = new HashMap<>();
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
46000);
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
46000);
-
- GroupCoordinatorConfig groupCoordinatorConfig =
createGroupCoordinatorConfig(configs);
- ShareGroupConfig shareGroupConfig = createShareGroupConfig();
-
- Properties newGroupConfig = new Properties();
- newGroupConfig.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
"2");
-
- assertDoesNotThrow(() ->
- GroupConfigManager.validate(newGroupConfig,
groupCoordinatorConfig, shareGroupConfig));
+ public void testClampWithCustomBrokerBounds() {
+ Map<String, Object> overrides = new HashMap<>();
+
overrides.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
50000);
+
overrides.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
46000);
+
overrides.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG,
48000);
+ configManager = createConfigManager(overrides);
+
+ String groupId = "test-group";
+
+ // Value above custom max is clamped to custom max.
+ Properties props1 = new Properties();
+ props1.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 55000);
+ configManager.updateGroupConfig(groupId, props1);
+ assertEquals(50000,
configManager.groupConfig(groupId).get().getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
+
+ // Value below custom min is clamped to custom min.
+ Properties props2 = new Properties();
+ props2.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 44000);
+ configManager.updateGroupConfig(groupId, props2);
+ assertEquals(46000,
configManager.groupConfig(groupId).get().getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
+
+ // Value within custom range is stored as-is.
+ Properties props3 = new Properties();
+ props3.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 49000);
+ configManager.updateGroupConfig(groupId, props3);
+ assertEquals(49000,
configManager.groupConfig(groupId).get().getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
}
public static GroupConfigManager createConfigManager() {
- Map<String, String> defaultConfig = new HashMap<>();
- defaultConfig.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT));
- defaultConfig.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT));
- defaultConfig.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
String.valueOf(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT));
- return new GroupConfigManager(defaultConfig);
+ return createConfigManager(new HashMap<>());
}
- private static GroupCoordinatorConfig
createGroupCoordinatorConfig(Map<String, Object> overrides) {
- Map<String, Object> configs = new HashMap<>(overrides);
- return new GroupCoordinatorConfig(new AbstractConfig(
- GroupCoordinatorConfig.CONFIG_DEF,
- configs,
- false
- ));
- }
+ public static GroupConfigManager createConfigManager(Map<String, Object>
overrides) {
+ GroupCoordinatorConfig groupCoordinatorConfig =
GroupCoordinatorConfig.fromProps(overrides);
+ ShareGroupConfig shareGroupConfig =
ShareGroupConfig.fromProps(overrides);
+
+ Map<String, Integer> defaultConfig = new
HashMap<>(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
- private static ShareGroupConfig createShareGroupConfig() {
- return new ShareGroupConfig(new AbstractConfig(
- Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF,
GroupCoordinatorConfig.CONFIG_DEF)),
- new HashMap<>(),
- false
- ));
+ return new GroupConfigManager(defaultConfig, groupCoordinatorConfig,
shareGroupConfig);
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 130b87e52c8..2315d4496ce 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -23,14 +23,38 @@ import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigTest;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-
+import java.util.stream.Stream;
+
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT;
+import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DEFAULT;
+import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class GroupConfigTest {
@@ -307,6 +331,163 @@ public class GroupConfigTest {
assertThrows(InvalidConfigurationException.class, () ->
GroupConfig.validate(props, createGroupCoordinatorConfig(),
createShareGroupConfig()));
}
+ @Test
+ public void testValidateWithAllGroupTypeConfigs() {
+ Map<String, Object> overrides = new HashMap<>();
+ // Consumer
+
overrides.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
46000);
+
overrides.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG,
46000);
+ // Streams
+
overrides.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
46000);
+
overrides.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
46000);
+ // Share
+
overrides.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
46000);
+
overrides.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG,
46000);
+
+ GroupCoordinatorConfig groupCoordinatorConfig =
GroupCoordinatorConfig.fromProps(overrides);
+ ShareGroupConfig shareGroupConfig =
ShareGroupConfig.fromProps(overrides);
+
+ assertDoesNotThrow(() ->
+ GroupConfig.validate(new Properties(), groupCoordinatorConfig,
shareGroupConfig));
+ }
+
+ @Test
+ public void testEvaluateEmptyPropsReturnsEmpty() {
+ Properties result = GroupConfig.evaluate(
+ new Properties(), "test-group",
+ GroupCoordinatorConfig.fromProps(new HashMap<>()),
ShareGroupConfig.fromProps(new HashMap<>()));
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testEvaluateDoesNotModifyInput() {
+ Properties props = new Properties();
+ props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 70000);
+
+ Properties propsSnapshot = new Properties();
+ propsSnapshot.putAll(props);
+
+ GroupConfig.evaluate(props, "test-group",
+ GroupCoordinatorConfig.fromProps(new HashMap<>()),
ShareGroupConfig.fromProps(new HashMap<>()));
+ assertEquals(propsSnapshot, props);
+ }
+
+ /**
+ * Data source for configs with bidirectional [min, max] evaluation.
+ * Each entry: (configKey, tooLow, expectedMin, tooHigh, expectedMax).
+ */
+ private static Stream<Arguments> rangeBoundedConfigs() {
+ return Stream.of(
+ // Consumer group configs
+ Arguments.of(
+ GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+ 40000, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT,
+ 70000, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT
+ ),
+ Arguments.of(
+ GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+ 3000, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+ 20000, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT
+ ),
+ // Share group configs
+ Arguments.of(
+ GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG,
+ 40000, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT,
+ 70000, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT
+ ),
+ Arguments.of(
+ GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+ 3000, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+ 20000, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT
+ ),
+ Arguments.of(
+ GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
+ 10000, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT,
+ 70000, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT
+ ),
+ Arguments.of(
+ GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+ 1, SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DEFAULT,
+ 15, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT
+ ),
+ Arguments.of(
+ GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+ 50, SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DEFAULT,
+ 5000, SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_DEFAULT
+ ),
+ // Streams group configs
+ Arguments.of(
+ GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+ 40000, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT,
+ 70000, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT
+ ),
+ Arguments.of(
+ GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+ 3000, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+ 20000, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT
+ )
+ );
+ }
+
+ /**
+ * Data source for configs with max-only evaluation (no min bound enforced
by evaluate).
+ * Each entry: (configKey, tooHigh, expectedMax).
+ */
+ private static Stream<Arguments> maxBoundedConfigs() {
+ return Stream.of(
+ Arguments.of(
+ GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
+ 5, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT
+ )
+ );
+ }
+
+ @ParameterizedTest(name = "testEvaluateValueAboveMaxIsCapped[{0}]")
+ @MethodSource("rangeBoundedConfigs")
+ public void testEvaluateValueAboveMaxIsCapped(
+ String key,
+ int tooLow,
+ int expectedMin,
+ int tooHigh,
+ int expectedMax
+ ) {
+ Properties props = new Properties();
+ props.put(key, tooHigh);
+ Properties result = GroupConfig.evaluate(props, "test-group",
+ GroupCoordinatorConfig.fromProps(new HashMap<>()),
ShareGroupConfig.fromProps(new HashMap<>()));
+ assertEquals(expectedMax, result.get(key));
+ }
+
+ @ParameterizedTest(name = "testEvaluateValueBelowMinIsCapped[{0}]")
+ @MethodSource("rangeBoundedConfigs")
+ public void testEvaluateValueBelowMinIsCapped(
+ String key,
+ int tooLow,
+ int expectedMin,
+ int tooHigh,
+ int expectedMax
+ ) {
+ Properties props = new Properties();
+ props.put(key, tooLow);
+ Properties result = GroupConfig.evaluate(props, "test-group",
+ GroupCoordinatorConfig.fromProps(new HashMap<>()),
ShareGroupConfig.fromProps(new HashMap<>()));
+ assertEquals(expectedMin, result.get(key));
+ }
+
+ @ParameterizedTest(name =
"testEvaluateMaxBoundedValueAboveMaxIsCapped[{0}]")
+ @MethodSource("maxBoundedConfigs")
+ public void testEvaluateMaxBoundedValueAboveMaxIsCapped(
+ String key,
+ int tooHigh,
+ int expectedMax
+ ) {
+ Properties props = new Properties();
+ props.put(key, tooHigh);
+ Properties result = GroupConfig.evaluate(props, "test-group",
+ GroupCoordinatorConfig.fromProps(new HashMap<>()),
ShareGroupConfig.fromProps(new HashMap<>()));
+ assertEquals(expectedMax, result.get(key));
+ }
+
private Properties createValidGroupConfig() {
Properties props = new Properties();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "45000");
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 154808578c2..837de8ce881 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20907,6 +20907,253 @@ public class GroupMetadataManagerTest {
context.assertNoRebalanceTimeout(groupId, memberId);
}
+ @Test
+ public void testConsumerGroupEvaluatedConfigs() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addRacks()
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )))
+ ));
+
+ // Session timer is scheduled on first heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result =
+ context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(90000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setTopicPartitions(List.of()));
+ assertEquals(1, result.response().memberEpoch());
+
+ // Verify default heartbeat interval and session timeout before config
update.
+
assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
+ result.response().heartbeatIntervalMs());
+ context.assertSessionTimeout(groupId, memberId,
+ GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT);
+
+ // Advance time.
+ assertEquals(
+ List.of(),
+ context.sleep(result.response().heartbeatIntervalMs())
+ );
+
+ // Dynamic update group config with out-of-range values.
+ // Session timeout 70000 exceeds max 60000; heartbeat interval 1 is
below min 5000.
+ Properties newGroupConfig = new Properties();
+ newGroupConfig.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 70000);
+ newGroupConfig.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 1);
+ context.updateGroupConfig(groupId, newGroupConfig);
+
+ // Session timer is rescheduled on second heartbeat.
+ result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(result.response().memberEpoch()));
+ assertEquals(1, result.response().memberEpoch());
+
+ // Verify heartbeat interval is evaluated to min.
+
assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+ result.response().heartbeatIntervalMs());
+
+ // Verify session timeout is evaluated to max.
+ context.assertSessionTimeout(groupId, memberId,
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
+ }
+
+ @Test
+ public void testShareGroupEvaluatedConfigs() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("simple");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addRacks()
+ .buildCoordinatorMetadataImage())
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )))
+ ));
+
+ CoordinatorMetadataImage image = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+
+ context.groupMetadataManager.onMetadataUpdate(image.emptyDelta(),
image);
+
+ // Session timer is scheduled on first heartbeat.
+ CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData,
Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result =
+ context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List.of("foo")));
+ assertEquals(1, result.response().getKey().memberEpoch());
+
+ // Verify default heartbeat interval and session timeout before config
update.
+
assertEquals(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
+ result.response().getKey().heartbeatIntervalMs());
+ context.assertSessionTimeout(groupId, memberId,
+ GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT);
+
+ // Advance time.
+ assertEquals(
+ List.of(),
+ context.sleep(result.response().getKey().heartbeatIntervalMs())
+ );
+
+ // Dynamic update group config with out-of-range values.
+ // Session timeout 70000 exceeds max 60000; heartbeat interval 1 is
below min 5000.
+ Properties newGroupConfig = new Properties();
+ newGroupConfig.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, 70000);
+ newGroupConfig.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 1);
+ context.updateGroupConfig(groupId, newGroupConfig);
+
+ // Replay ShareGroupStatePartitionMetadata required before second
heartbeat.
+ context.groupMetadataManager.replay(
+ new ShareGroupStatePartitionMetadataKey()
+ .setGroupId(groupId),
+ new ShareGroupStatePartitionMetadataValue()
+ .setInitializedTopics(List.of(
+ new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+ .setTopicId(fooTopicId)
+ .setTopicName(fooTopicName)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))
+ .setDeletingTopics(List.of())
+ );
+
+ // Session timer is rescheduled on second heartbeat.
+ result = context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(result.response().getKey().memberEpoch()));
+ assertEquals(1, result.response().getKey().memberEpoch());
+
+ // Verify heartbeat interval is evaluated to min.
+
assertEquals(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+ result.response().getKey().heartbeatIntervalMs());
+
+ // Verify session timeout is evaluated to max.
+ context.assertSessionTimeout(groupId, memberId,
+ GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
+ }
+
+ @Test
+ public void testStreamsGroupEvaluatedConfigs() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addRacks()
+ .buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
+ .build();
+
+ assignor.prepareGroupAssignment(
+ Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))));
+
+ // Session timer is scheduled on first heartbeat.
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result =
+ context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(10000)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+ assertEquals(2, result.response().data().memberEpoch());
+
+ // Verify default heartbeat interval, session timeout, and
num.standby.replicas before config update.
+
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
+ result.response().data().heartbeatIntervalMs());
+ context.assertSessionTimeout(groupId, memberId,
+ GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT);
+ assertEquals(Map.of("num.standby.replicas",
+
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT)),
+ assignor.lastPassedAssignmentConfigs());
+
+ // Advance time.
+ assertEquals(
+ List.of(),
+ context.sleep(result.response().data().heartbeatIntervalMs())
+ );
+
+ // Dynamic update group config with out-of-range values.
+ // Session timeout 70000 exceeds max 60000; heartbeat interval 1 is
below min 5000;
+ // num standby replicas 100 exceeds max 2.
+ Properties newGroupConfig = new Properties();
+ newGroupConfig.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 70000);
+ newGroupConfig.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 1);
+ newGroupConfig.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 100);
+ context.updateGroupConfig(groupId, newGroupConfig);
+
+ // Session timer is rescheduled on second heartbeat, new assignment
with evaluated parameter is calculated.
+ result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(result.response().data().memberEpoch())
+ .setRackId("bla"));
+
+ // Verify heartbeat interval is evaluated to min.
+
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+ result.response().data().heartbeatIntervalMs());
+
+ // Verify session timeout is evaluated to max.
+ context.assertSessionTimeout(groupId, memberId,
+
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
+
+ // Verify that the number of standby replicas is evaluated to max.
+ assertEquals(Map.of("num.standby.replicas",
+
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT)),
+ assignor.lastPassedAssignmentConfigs());
+ }
+
@Test
public void testReplayConsumerGroupMemberMetadata() {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index 9d5fdf0ffca..50ceedfda81 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -16,14 +16,11 @@
*/
package org.apache.kafka.coordinator.group.modern.share;
-import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -169,7 +166,6 @@ public class ShareGroupConfigTest {
}
private static ShareGroupConfig createConfig(Map<String, Object> configs) {
- return new ShareGroupConfig(
- new
AbstractConfig(Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF,
GroupCoordinatorConfig.CONFIG_DEF)), configs, false));
+ return ShareGroupConfig.fromProps(configs);
}
}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
index 7e782b292a3..f2f3b835c50 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
@@ -40,6 +40,7 @@ import
org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
+import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
@@ -103,6 +104,7 @@ public class GroupCoordinatorShardLoadingBenchmark {
private TopicPartition topicPartition;
private MockTime time;
private GroupCoordinatorConfig config;
+ private ShareGroupConfig shareGroupConfig;
private GroupCoordinatorRecordSerde serde;
private GroupCoordinatorShard coordinatorShard;
private SnapshottableCoordinator<GroupCoordinatorShard, CoordinatorRecord>
snapshottableCoordinator;
@@ -269,6 +271,7 @@ public class GroupCoordinatorShardLoadingBenchmark {
time = new MockTime();
Map<String, Object> props = new HashMap<>();
config = GroupCoordinatorConfig.fromProps(props);
+ shareGroupConfig = ShareGroupConfig.fromProps(props);
serde = new GroupCoordinatorRecordSerde();
}
@@ -287,7 +290,7 @@ public class GroupCoordinatorShardLoadingBenchmark {
@Setup(Level.Invocation)
public void setupInvocation() {
- GroupConfigManager configManager = new GroupConfigManager(new
HashMap<>());
+ GroupConfigManager configManager = new GroupConfigManager(new
HashMap<>(), config, shareGroupConfig);
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);