This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e36319010c2 KAFKA-20162: Validation and enforcement of group 
configurations (#21633)
e36319010c2 is described below

commit e36319010c21e5ef454062d0f2b85634c158fa21
Author: majialong <[email protected]>
AuthorDate: Wed Mar 11 03:27:51 2026 +0800

    KAFKA-20162: Validation and enforcement of group configurations (#21633)
    
    Implement KAFKA-20162 (KIP-1240) by adding broker-level bound evaluation
    for dynamic group configs.   Out-of-range values are capped to broker
    limits with WARN logs, while Admin validation remains strict and
    metadata-stored raw values are preserved.
    
    Reviewers: Sean Quah <[email protected]>, Andrew Schofield
    <[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   2 +-
 .../server/ControllerConfigurationValidator.scala  |   4 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  46 ++++
 .../kafka/coordinator/group/GroupConfig.java       | 184 ++++++++++++++-
 .../coordinator/group/GroupConfigManager.java      |  48 ++--
 .../coordinator/group/GroupCoordinatorConfig.java  |  18 +-
 .../group/modern/share/ShareGroupConfig.java       |  28 +--
 .../coordinator/group/GroupConfigManagerTest.java  |  76 ++++---
 .../kafka/coordinator/group/GroupConfigTest.java   | 183 ++++++++++++++-
 .../group/GroupMetadataManagerTest.java            | 247 +++++++++++++++++++++
 .../group/modern/share/ShareGroupConfigTest.java   |   6 +-
 .../GroupCoordinatorShardLoadingBenchmark.java     |   5 +-
 12 files changed, 752 insertions(+), 95 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 698f8857881..8493321641f 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -374,7 +374,7 @@ class BrokerServer(
       authorizerPlugin = config.createNewAuthorizer(metrics, 
ProcessRole.BrokerRole.toString)
 
       /* initializing the groupConfigManager */
-      groupConfigManager = new 
GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig))
+      groupConfigManager = new 
GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig),
 config.groupCoordinatorConfig, config.shareGroupConfig)
 
       /* create share coordinator */
       shareCoordinator = createShareCoordinator()
diff --git 
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala 
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index f163a2739ae..89fdb1d4242 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -24,7 +24,7 @@ import 
org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRIC
 import org.apache.kafka.controller.ConfigurationValidator
 import org.apache.kafka.common.errors.{InvalidConfigurationException, 
InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.coordinator.group.GroupConfigManager
+import org.apache.kafka.coordinator.group.GroupConfig
 import org.apache.kafka.server.metrics.ClientMetricsConfigs
 import org.apache.kafka.storage.internals.log.LogConfig
 
@@ -139,7 +139,7 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
           throw new InvalidConfigurationException("Null value not supported 
for group configs: " +
             nullGroupConfigs.mkString(","))
         }
-        GroupConfigManager.validate(properties, 
kafkaConfig.groupCoordinatorConfig, kafkaConfig.shareGroupConfig)
+        GroupConfig.validate(properties, kafkaConfig.groupCoordinatorConfig, 
kafkaConfig.shareGroupConfig)
       case _ => throwExceptionForUnknownResourceType(resource)
     }
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 51be2b7b9f2..327b5ad2b36 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1157,6 +1157,52 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       "consumer.session.timeout.ms must be greater than or equal to 
group.consumer.min.session.timeout.ms")
   }
 
+  @Test
+  def testGroupConfigEvaluatedAfterBrokerRestart(): Unit = {
+    client = createAdminClient
+    val groupId = "evaluated-config-test-group"
+    val groupResource = new ConfigResource(ConfigResource.Type.GROUP, groupId)
+
+    // Set a valid group config (55000 is within default [45000, 60000])
+    val alterOps = util.List.of(
+      new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "55000"), 
AlterConfigOp.OpType.SET)
+    )
+    val alterResult = 
client.incrementalAlterConfigs(util.Map.of(groupResource, alterOps))
+    alterResult.all.get(15, TimeUnit.SECONDS)
+    ensureConsistentKRaftMetadata()
+
+    // Verify stored value and effective value before restart
+    var describeResult = client.describeConfigs(util.List.of(groupResource))
+    var configs = describeResult.all.get(15, TimeUnit.SECONDS)
+    assertEquals("55000", 
configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).value)
+    // Before restart, 55000 is within [45000, 60000], so no adjustment needed
+    assertEquals(55000, 
brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs)
+
+    // Kill all brokers
+    client.close()
+    for (i <- 0 until brokerCount) {
+      killBroker(i)
+    }
+
+    // Change broker-level max to 50000 (making stored 55000 exceed the new 
max)
+    
serverConfig.setProperty(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
 "50000")
+
+    // Restart brokers with new config (should not block startup)
+    restartDeadBrokers(reconfigure = true)
+    client = createAdminClient
+    ensureConsistentKRaftMetadata()
+
+    // Verify stored value is preserved (describeConfigs returns raw value)
+    describeResult = client.describeConfigs(util.List.of(groupResource))
+    configs = describeResult.all.get(15, TimeUnit.SECONDS)
+    assertEquals("55000", 
configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).value)
+    assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG,
+      
configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).source)
+
+    // Verify effective value is adjusted (55000 evaluated to new max 50000)
+    assertEquals(50000, 
brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs)
+  }
+
   @Test
   def testCreatePartitions(): Unit = {
     client = createAdminClient
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 88cd51a6750..f17ee7fe61d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -24,6 +24,9 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
@@ -43,6 +46,8 @@ import static 
org.apache.kafka.common.config.ConfigDef.ValidString.in;
  */
 public final class GroupConfig extends AbstractConfig {
 
+    private static final Logger log = 
LoggerFactory.getLogger(GroupConfig.class);
+
     public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
 
     public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
@@ -298,11 +303,11 @@ public final class GroupConfig extends AbstractConfig {
         }
         if (shareDeliveryCountLimit < 
shareGroupConfig.shareGroupMinDeliveryCountLimit()) {
             throw new 
InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be 
greater than or equal to " +
-                    
ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG);
+                ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG);
         }
         if (shareDeliveryCountLimit > 
shareGroupConfig.shareGroupMaxDeliveryCountLimit()) {
             throw new 
InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be 
less than or equal to " +
-                    
ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG);
+                ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG);
         }
         if (sharePartitionMaxRecordLocks < 
shareGroupConfig.shareGroupMinPartitionMaxRecordLocks()) {
             throw new 
InvalidConfigurationException(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG + " must 
be greater than or equal to " +
@@ -347,15 +352,182 @@ public final class GroupConfig extends AbstractConfig {
     }
 
     /**
-     * Check that the given properties contain only valid consumer group 
config names and that all values can be
-     * parsed and are valid.
+     * Check that the given properties contain only valid group config names 
and that
+     * all values can be parsed and are valid. The provided properties are 
merged with
+     * the broker-level defaults before validation.
      */
     public static void validate(Properties props, GroupCoordinatorConfig 
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
-        validateNames(props);
-        Map<?, ?> valueMaps = CONFIG.parse(props);
+        Properties combinedConfigs = new Properties();
+        
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
+        combinedConfigs.putAll(props);
+
+        validateNames(combinedConfigs);
+        Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
         validateValues(valueMaps, groupCoordinatorConfig, shareGroupConfig);
     }
 
+    /**
+     * Evaluate group config values to their effective values within 
broker-level bounds.
+     * Out-of-range values are capped and a WARN log is emitted.
+     *
+     * @param props                  The raw group config properties.
+     * @param groupId                The group id.
+     * @param groupCoordinatorConfig The group coordinator config.
+     * @param shareGroupConfig       The share group config.
+     * @return A new Properties with out-of-range values capped.
+     */
+    public static Properties evaluate(
+        Properties props,
+        String groupId,
+        GroupCoordinatorConfig groupCoordinatorConfig,
+        ShareGroupConfig shareGroupConfig
+    ) {
+        Properties effective = new Properties();
+        effective.putAll(props);
+        evaluateValues(effective, groupId, groupCoordinatorConfig, 
shareGroupConfig);
+        return effective;
+    }
+
+    private static void evaluateValues(
+        Properties props,
+        String groupId,
+        GroupCoordinatorConfig groupCoordinatorConfig,
+        ShareGroupConfig shareGroupConfig
+    ) {
+        // Consumer group configs
+        clampToRange(props, groupId, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(),
+            groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs());
+        clampToRange(props, groupId, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
+            groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs());
+
+        // Share group configs
+        clampToRange(props, groupId, SHARE_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(),
+            groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs());
+        clampToRange(props, groupId, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(),
+            groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs());
+        clampToRange(props, groupId, SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
+            shareGroupConfig.shareGroupMinRecordLockDurationMs(),
+            shareGroupConfig.shareGroupMaxRecordLockDurationMs());
+        clampToRange(props, groupId, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+            shareGroupConfig.shareGroupMinDeliveryCountLimit(),
+            shareGroupConfig.shareGroupMaxDeliveryCountLimit());
+        clampToRange(props, groupId, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+            shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
+            shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks());
+
+        // Streams group configs
+        clampToRange(props, groupId, STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(),
+            groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs());
+        clampToRange(props, groupId, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(),
+            groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs());
+        clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas());
+
+        // Verify that clamping did not break the session > heartbeat 
invariant.
+        checkSessionExceedsHeartbeat(props, groupId,
+            CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
groupCoordinatorConfig.consumerGroupSessionTimeoutMs(),
+            CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs());
+        checkSessionExceedsHeartbeat(props, groupId,
+            SHARE_SESSION_TIMEOUT_MS_CONFIG, 
groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
+            SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
groupCoordinatorConfig.shareGroupHeartbeatIntervalMs());
+        checkSessionExceedsHeartbeat(props, groupId,
+            STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
groupCoordinatorConfig.streamsGroupSessionTimeoutMs(),
+            STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs());
+    }
+
+    /**
+     * Log a WARN if the session timeout is not greater than the heartbeat 
interval after
+     * evaluation. When a key is absent from props, the broker-level default 
is used.
+     */
+    private static void checkSessionExceedsHeartbeat(
+        Properties props,
+        String groupId,
+        String sessionKey,
+        int defaultSession,
+        String heartbeatKey,
+        int defaultHeartbeat
+    ) {
+        Object rawSession = props.get(sessionKey);
+        Object rawHeartbeat = props.get(heartbeatKey);
+        if (rawSession == null && rawHeartbeat == null) return;
+
+        int session = rawSession != null ? 
Integer.parseInt(rawSession.toString()) : defaultSession;
+        int heartbeat = rawHeartbeat != null ? 
Integer.parseInt(rawHeartbeat.toString()) : defaultHeartbeat;
+        if (session <= heartbeat) {
+            log.warn("The effective {} ({}) for group '{}' is not greater than 
{} ({}). "
+                    + "Check that the broker-level min/max bounds for session 
timeout "
+                    + "and heartbeat interval do not overlap.",
+                sessionKey, session, groupId, heartbeatKey, heartbeat);
+        }
+    }
+
+    /**
+     * Clamp a config value to [min, max]. A WARN log is emitted on adjustment.
+     * No-op when the key is absent from props.
+     *
+     * @param props   The properties to modify in place.
+     * @param groupId The group id.
+     * @param key     The config key.
+     * @param min     The minimum allowed value (inclusive).
+     * @param max     The maximum allowed value (inclusive).
+     */
+    private static void clampToRange(
+        Properties props,
+        String groupId,
+        String key,
+        int min,
+        int max
+    ) {
+        Object rawValue = props.get(key);
+        if (rawValue == null) return;
+
+        int value = Integer.parseInt(rawValue.toString());
+        if (value < min) {
+            log.warn("The group config '{}' for group '{}' has value {} which 
is below the broker's " +
+                    "allowed minimum {}. The effective value will be capped to 
{}.",
+                key, groupId, value, min, min);
+            props.put(key, min);
+        } else if (value > max) {
+            log.warn("The group config '{}' for group '{}' has value {} which 
exceeds the broker's " +
+                    "allowed maximum {}. The effective value will be capped to 
{}.",
+                key, groupId, value, max, max);
+            props.put(key, max);
+        }
+    }
+
+    /**
+     * Clamp a config value to at most max. A WARN log is emitted on 
adjustment.
+     * No-op when the key is absent from props.
+     *
+     * @param props   The properties to modify in place.
+     * @param groupId The group id.
+     * @param key     The config key.
+     * @param max     The maximum allowed value (inclusive).
+     */
+    private static void clampToMax(
+        Properties props,
+        String groupId,
+        String key,
+        int max
+    ) {
+        Object rawValue = props.get(key);
+        if (rawValue == null) return;
+
+        int value = Integer.parseInt(rawValue.toString());
+        if (value > max) {
+            log.warn("The group config '{}' for group '{}' has value {} which 
exceeds the broker's " +
+                    "allowed maximum {}. The effective value will be capped to 
{}.",
+                key, groupId, value, max, max);
+            props.put(key, max);
+        }
+    }
+
     /**
      * Create a group config instance using the given properties and defaults.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
index 892dc9272ef..05ebecfc99c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
@@ -17,12 +17,12 @@
 
 package org.apache.kafka.coordinator.group;
 
-import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
@@ -36,31 +36,50 @@ public class GroupConfigManager implements AutoCloseable {
 
     private final Map<String, GroupConfig> configMap;
 
-    public GroupConfigManager(Map<?, ?> defaultConfig) {
+    private final GroupCoordinatorConfig groupCoordinatorConfig;
+
+    private final ShareGroupConfig shareGroupConfig;
+
+    public GroupConfigManager(
+        Map<?, ?> defaultConfig,
+        GroupCoordinatorConfig groupCoordinatorConfig,
+        ShareGroupConfig shareGroupConfig
+    ) {
         this.configMap = new ConcurrentHashMap<>();
         this.defaultConfig = new GroupConfig(defaultConfig);
+        this.groupCoordinatorConfig = 
Objects.requireNonNull(groupCoordinatorConfig);
+        this.shareGroupConfig = Objects.requireNonNull(shareGroupConfig);
     }
 
     /**
      * Update the configuration of the provided group.
      *
-     * @param groupId                   The group id.
-     * @param newGroupConfig            The new group config.
+     * This method evaluates all configuration values within broker-level 
bounds.
+     *
+     * @param groupId        The group id.
+     * @param newGroupConfig The new group config.
      */
     public void updateGroupConfig(String groupId, Properties newGroupConfig) {
         if (null == groupId || groupId.isEmpty()) {
             throw new InvalidRequestException("Group name can't be empty.");
         }
 
+        // Evaluate ensures configs respect broker-level bounds. For the Admin 
API path,
+        // values are pre-validated so this is effectively a no-op. For the 
broker startup
+        // path, configs from metadata may need evaluation if bounds have 
changed.
+        Properties evaluatedProps = GroupConfig.evaluate(
+            newGroupConfig, groupId, groupCoordinatorConfig, shareGroupConfig);
+
         final GroupConfig newConfig = GroupConfig.fromProps(
             defaultConfig.originals(),
-            newGroupConfig
+            evaluatedProps
         );
         configMap.put(groupId, newConfig);
     }
 
     /**
      * Get the group config if it exists, otherwise return None.
+     * The returned config has already been evaluated within broker-level 
bounds.
      *
      * @param groupId  The group id.
      * @return The group config.
@@ -73,25 +92,6 @@ public class GroupConfigManager implements AutoCloseable {
         return List.copyOf(configMap.keySet());
     }
 
-    /**
-     * Validate the given properties.
-     *
-     * @param newGroupConfig         The new group config.
-     * @param groupCoordinatorConfig The group coordinator config.
-     * @param shareGroupConfig       The share group config.
-     * @throws InvalidConfigurationException If validation fails.
-     */
-    public static void validate(
-        Properties newGroupConfig,
-        GroupCoordinatorConfig groupCoordinatorConfig,
-        ShareGroupConfig shareGroupConfig
-    ) {
-        Properties combinedConfigs = new Properties();
-        
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
-        combinedConfigs.putAll(newGroupConfig);
-        GroupConfig.validate(combinedConfigs, groupCoordinatorConfig, 
shareGroupConfig);
-    }
-
     /**
      * Remove all group configs.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 5a55748e334..cc808999bc0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -666,7 +666,7 @@ public class GroupCoordinatorConfig {
     public Map<String, Integer> extractGroupConfigMap(ShareGroupConfig 
shareGroupConfig) {
         Map<String, Integer> defaultConfigs = new HashMap<>();
         defaultConfigs.putAll(extractConsumerGroupConfigMap());
-        
defaultConfigs.putAll(shareGroupConfig.extractShareGroupConfigMap(this));
+        defaultConfigs.putAll(extractShareGroupConfigMap(shareGroupConfig));
         defaultConfigs.putAll(extractStreamsGroupConfigMap());
         return Collections.unmodifiableMap(defaultConfigs);
     }
@@ -680,6 +680,22 @@ public class GroupCoordinatorConfig {
             GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
consumerGroupHeartbeatIntervalMs());
     }
 
+    /**
+     * Copy the subset of properties that are relevant to share group. These 
configs include those which can be set
+     * statically (for all groups) or dynamically (for a specific group). In 
those cases, the default value for the
+     * group specific dynamic config (Ex. share.session.timeout.ms) should be 
the value set for the static config
+     * (Ex. group.share.session.timeout.ms).
+     */
+    public Map<String, Integer> extractShareGroupConfigMap(ShareGroupConfig 
shareGroupConfig) {
+        return Map.of(
+            GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, 
this.shareGroupSessionTimeoutMs(),
+            GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
this.shareGroupHeartbeatIntervalMs(),
+            GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
shareGroupConfig.shareGroupRecordLockDurationMs(),
+            GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
shareGroupConfig.shareGroupDeliveryCountLimit(),
+            GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
shareGroupConfig.shareGroupPartitionMaxRecordLocks()
+        );
+    }
+
     /**
      * Copy the subset of properties that are relevant to streams group.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index 0ddfbe624e4..5c993f420b2 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -19,9 +19,9 @@ package org.apache.kafka.coordinator.group.modern.share;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 
+import java.util.Arrays;
 import java.util.Map;
 
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
@@ -137,6 +137,16 @@ public class ShareGroupConfig {
         validate();
     }
 
+    public static ShareGroupConfig fromProps(Map<?, ?> props) {
+        return new ShareGroupConfig(
+            new AbstractConfig(
+                Utils.mergeConfigs(Arrays.asList(CONFIG_DEF, 
GroupCoordinatorConfig.CONFIG_DEF)),
+                props,
+                false
+            )
+        );
+    }
+
     /** Share group configuration **/
     public boolean isShareGroupEnabled() {
         return isShareGroupEnabled;
@@ -213,20 +223,4 @@ public class ShareGroupConfig {
                 String.format("%s must be greater than or equal to %s",
                         SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG));
     }
-
-    /**
-     * Copy the subset of properties that are relevant to share group. These 
configs include those which can be set
-     * statically (for all groups) or dynamically (for a specific group). In 
those cases, the default value for the
-     * group specific dynamic config (Ex. share.session.timeout.ms) should be 
the value set for the static config
-     * (Ex. group.share.session.timeout.ms).
-     */
-    public Map<String, Integer> 
extractShareGroupConfigMap(GroupCoordinatorConfig groupCoordinatorConfig) {
-        return Map.of(
-            GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, 
groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
-            GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
groupCoordinatorConfig.shareGroupHeartbeatIntervalMs(),
-            GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
shareGroupRecordLockDurationMs(),
-            GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
shareGroupDeliveryCountLimit(),
-            GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
shareGroupPartitionMaxRecordLocks()
-        );
-    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
index d50ac1301d0..f258514f2a3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
@@ -17,16 +17,13 @@
 
 package org.apache.kafka.coordinator.group;
 
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.errors.InvalidRequestException;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -34,8 +31,6 @@ import java.util.Properties;
 
 import static 
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -69,6 +64,12 @@ public class GroupConfigManagerTest {
         assertFalse(groupConfig.isPresent());
     }
 
+    @Test
+    public void testUpdateConfigWithNullGroupId() {
+        assertThrows(InvalidRequestException.class,
+            () -> configManager.updateGroupConfig(null, new Properties()));
+    }
+
     @Test
     public void testUpdateGroupConfig() {
         String groupId = "foo";
@@ -86,43 +87,44 @@ public class GroupConfigManagerTest {
     }
 
     @Test
-    public void testValidateUsesAllGroupTypeDefaults() {
-        Map<String, Object> configs = new HashMap<>();
-        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
46000);
-        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
46000);
-
-        GroupCoordinatorConfig groupCoordinatorConfig = 
createGroupCoordinatorConfig(configs);
-        ShareGroupConfig shareGroupConfig = createShareGroupConfig();
-
-        Properties newGroupConfig = new Properties();
-        newGroupConfig.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
"2");
-
-        assertDoesNotThrow(() ->
-            GroupConfigManager.validate(newGroupConfig, 
groupCoordinatorConfig, shareGroupConfig));
+    public void testClampWithCustomBrokerBounds() {
+        Map<String, Object> overrides = new HashMap<>();
+        
overrides.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
 50000);
+        
overrides.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
 46000);
+        
overrides.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
48000);
+        configManager = createConfigManager(overrides);
+
+        String groupId = "test-group";
+
+        // Value above custom max is clamped to custom max.
+        Properties props1 = new Properties();
+        props1.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 55000);
+        configManager.updateGroupConfig(groupId, props1);
+        assertEquals(50000, 
configManager.groupConfig(groupId).get().getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
+
+        // Value below custom min is clamped to custom min.
+        Properties props2 = new Properties();
+        props2.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 44000);
+        configManager.updateGroupConfig(groupId, props2);
+        assertEquals(46000, 
configManager.groupConfig(groupId).get().getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
+
+        // Value within custom range is stored as-is.
+        Properties props3 = new Properties();
+        props3.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 49000);
+        configManager.updateGroupConfig(groupId, props3);
+        assertEquals(49000, 
configManager.groupConfig(groupId).get().getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
     }
 
     public static GroupConfigManager createConfigManager() {
-        Map<String, String> defaultConfig = new HashMap<>();
-        defaultConfig.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT));
-        defaultConfig.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT));
-        defaultConfig.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
String.valueOf(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT));
-        return new GroupConfigManager(defaultConfig);
+        return createConfigManager(new HashMap<>());
     }
 
-    private static GroupCoordinatorConfig 
createGroupCoordinatorConfig(Map<String, Object> overrides) {
-        Map<String, Object> configs = new HashMap<>(overrides);
-        return new GroupCoordinatorConfig(new AbstractConfig(
-            GroupCoordinatorConfig.CONFIG_DEF,
-            configs,
-            false
-        ));
-    }
+    public static GroupConfigManager createConfigManager(Map<String, Object> 
overrides) {
+        GroupCoordinatorConfig groupCoordinatorConfig = 
GroupCoordinatorConfig.fromProps(overrides);
+        ShareGroupConfig shareGroupConfig = 
ShareGroupConfig.fromProps(overrides);
+
+        Map<String, Integer> defaultConfig = new 
HashMap<>(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
 
-    private static ShareGroupConfig createShareGroupConfig() {
-        return new ShareGroupConfig(new AbstractConfig(
-            Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF, 
GroupCoordinatorConfig.CONFIG_DEF)),
-            new HashMap<>(),
-            false
-        ));
+        return new GroupConfigManager(defaultConfig, groupCoordinatorConfig, 
shareGroupConfig);
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 130b87e52c8..2315d4496ce 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -23,14 +23,38 @@ import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigTest;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class GroupConfigTest {
 
@@ -307,6 +331,163 @@ public class GroupConfigTest {
         assertThrows(InvalidConfigurationException.class, () -> 
GroupConfig.validate(props, createGroupCoordinatorConfig(), 
createShareGroupConfig()));
     }
 
+    @Test
+    public void testValidateWithAllGroupTypeConfigs() {
+        Map<String, Object> overrides = new HashMap<>();
+        // Consumer
+        
overrides.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
 46000);
+        
overrides.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
46000);
+        // Streams
+        
overrides.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
 46000);
+        
overrides.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
46000);
+        // Share
+        
overrides.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
46000);
+        
overrides.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
46000);
+
+        GroupCoordinatorConfig groupCoordinatorConfig = 
GroupCoordinatorConfig.fromProps(overrides);
+        ShareGroupConfig shareGroupConfig = 
ShareGroupConfig.fromProps(overrides);
+
+        assertDoesNotThrow(() ->
+            GroupConfig.validate(new Properties(), groupCoordinatorConfig, 
shareGroupConfig));
+    }
+
+    @Test
+    public void testEvaluateEmptyPropsReturnsEmpty() {
+        Properties result = GroupConfig.evaluate(
+            new Properties(), "test-group",
+            GroupCoordinatorConfig.fromProps(new HashMap<>()), 
ShareGroupConfig.fromProps(new HashMap<>()));
+        assertTrue(result.isEmpty());
+    }
+
+    @Test
+    public void testEvaluateDoesNotModifyInput() {
+        Properties props = new Properties();
+        props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 70000);
+
+        Properties propsSnapshot = new Properties();
+        propsSnapshot.putAll(props);
+
+        GroupConfig.evaluate(props, "test-group",
+            GroupCoordinatorConfig.fromProps(new HashMap<>()), 
ShareGroupConfig.fromProps(new HashMap<>()));
+        assertEquals(propsSnapshot, props);
+    }
+
+    /**
+     * Data source for configs with bidirectional [min, max] evaluation.
+     * Each entry: (configKey, tooLow, expectedMin, tooHigh, expectedMax).
+     */
+    private static Stream<Arguments> rangeBoundedConfigs() {
+        return Stream.of(
+            // Consumer group configs
+            Arguments.of(
+                GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+                40000, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT,
+                70000, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT
+            ),
+            Arguments.of(
+                GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+                3000, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+                20000, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT
+            ),
+            // Share group configs
+            Arguments.of(
+                GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG,
+                40000, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT,
+                70000, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT
+            ),
+            Arguments.of(
+                GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+                3000, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+                20000, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT
+            ),
+            Arguments.of(
+                GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
+                10000, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT,
+                70000, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT
+            ),
+            Arguments.of(
+                GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+                1, SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DEFAULT,
+                15, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT
+            ),
+            Arguments.of(
+                GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+                50, SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DEFAULT,
+                5000, SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_DEFAULT
+            ),
+            // Streams group configs
+            Arguments.of(
+                GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+                40000, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT,
+                70000, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT
+            ),
+            Arguments.of(
+                GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+                3000, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+                20000, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT
+            )
+        );
+    }
+
+    /**
+     * Data source for configs with max-only evaluation (no min bound enforced 
by evaluate).
+     * Each entry: (configKey, tooHigh, expectedMax).
+     */
+    private static Stream<Arguments> maxBoundedConfigs() {
+        return Stream.of(
+            Arguments.of(
+                GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
+                5, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT
+            )
+        );
+    }
+
+    @ParameterizedTest(name = "testEvaluateValueAboveMaxIsCapped[{0}]")
+    @MethodSource("rangeBoundedConfigs")
+    public void testEvaluateValueAboveMaxIsCapped(
+        String key,
+        int tooLow,
+        int expectedMin,
+        int tooHigh,
+        int expectedMax
+    ) {
+        Properties props = new Properties();
+        props.put(key, tooHigh);
+        Properties result = GroupConfig.evaluate(props, "test-group",
+            GroupCoordinatorConfig.fromProps(new HashMap<>()), 
ShareGroupConfig.fromProps(new HashMap<>()));
+        assertEquals(expectedMax, result.get(key));
+    }
+
+    @ParameterizedTest(name = "testEvaluateValueBelowMinIsCapped[{0}]")
+    @MethodSource("rangeBoundedConfigs")
+    public void testEvaluateValueBelowMinIsCapped(
+        String key,
+        int tooLow,
+        int expectedMin,
+        int tooHigh,
+        int expectedMax
+    ) {
+        Properties props = new Properties();
+        props.put(key, tooLow);
+        Properties result = GroupConfig.evaluate(props, "test-group",
+            GroupCoordinatorConfig.fromProps(new HashMap<>()), 
ShareGroupConfig.fromProps(new HashMap<>()));
+        assertEquals(expectedMin, result.get(key));
+    }
+
+    @ParameterizedTest(name = 
"testEvaluateMaxBoundedValueAboveMaxIsCapped[{0}]")
+    @MethodSource("maxBoundedConfigs")
+    public void testEvaluateMaxBoundedValueAboveMaxIsCapped(
+        String key,
+        int tooHigh,
+        int expectedMax
+    ) {
+        Properties props = new Properties();
+        props.put(key, tooHigh);
+        Properties result = GroupConfig.evaluate(props, "test-group",
+            GroupCoordinatorConfig.fromProps(new HashMap<>()), 
ShareGroupConfig.fromProps(new HashMap<>()));
+        assertEquals(expectedMax, result.get(key));
+    }
+
     private Properties createValidGroupConfig() {
         Properties props = new Properties();
         props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "45000");
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 154808578c2..837de8ce881 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20907,6 +20907,253 @@ public class GroupMetadataManagerTest {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testConsumerGroupEvaluatedConfigs() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result =
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(90000)
+                    .setSubscribedTopicNames(List.of("foo"))
+                    .setTopicPartitions(List.of()));
+        assertEquals(1, result.response().memberEpoch());
+
+        // Verify default heartbeat interval and session timeout before config 
update.
+        
assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
+            result.response().heartbeatIntervalMs());
+        context.assertSessionTimeout(groupId, memberId,
+            GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT);
+
+        // Advance time.
+        assertEquals(
+            List.of(),
+            context.sleep(result.response().heartbeatIntervalMs())
+        );
+
+        // Dynamic update group config with out-of-range values.
+        // Session timeout 70000 exceeds max 60000; heartbeat interval 1 is 
below min 5000.
+        Properties newGroupConfig = new Properties();
+        newGroupConfig.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 70000);
+        newGroupConfig.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 1);
+        context.updateGroupConfig(groupId, newGroupConfig);
+
+        // Session timer is rescheduled on second heartbeat.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(result.response().memberEpoch()));
+        assertEquals(1, result.response().memberEpoch());
+
+        // Verify heartbeat interval is evaluated to min.
+        
assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+            result.response().heartbeatIntervalMs());
+
+        // Verify session timeout is evaluated to max.
+        context.assertSessionTimeout(groupId, memberId,
+            
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
+    }
+
+    @Test
+    public void testShareGroupEvaluatedConfigs() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("simple");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .buildCoordinatorMetadataImage())
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        CoordinatorMetadataImage image = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+
+        context.groupMetadataManager.onMetadataUpdate(image.emptyDelta(), 
image);
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, 
Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result =
+            context.shareGroupHeartbeat(
+                new ShareGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setSubscribedTopicNames(List.of("foo")));
+        assertEquals(1, result.response().getKey().memberEpoch());
+
+        // Verify default heartbeat interval and session timeout before config 
update.
+        
assertEquals(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
+            result.response().getKey().heartbeatIntervalMs());
+        context.assertSessionTimeout(groupId, memberId,
+            GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT);
+
+        // Advance time.
+        assertEquals(
+            List.of(),
+            context.sleep(result.response().getKey().heartbeatIntervalMs())
+        );
+
+        // Dynamic update group config with out-of-range values.
+        // Session timeout 70000 exceeds max 60000; heartbeat interval 1 is 
below min 5000.
+        Properties newGroupConfig = new Properties();
+        newGroupConfig.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, 70000);
+        newGroupConfig.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 1);
+        context.updateGroupConfig(groupId, newGroupConfig);
+
+        // Replay ShareGroupStatePartitionMetadata required before second 
heartbeat.
+        context.groupMetadataManager.replay(
+            new ShareGroupStatePartitionMetadataKey()
+                .setGroupId(groupId),
+            new ShareGroupStatePartitionMetadataValue()
+                .setInitializedTopics(List.of(
+                    new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+                        .setTopicId(fooTopicId)
+                        .setTopicName(fooTopicName)
+                        .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                ))
+                .setDeletingTopics(List.of())
+        );
+
+        // Session timer is rescheduled on second heartbeat.
+        result = context.shareGroupHeartbeat(
+            new ShareGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(result.response().getKey().memberEpoch()));
+        assertEquals(1, result.response().getKey().memberEpoch());
+
+        // Verify heartbeat interval is evaluated to min.
+        
assertEquals(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+            result.response().getKey().heartbeatIntervalMs());
+
+        // Verify session timeout is evaluated to max.
+        context.assertSessionTimeout(groupId, memberId,
+            GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
+    }
+
+    @Test
+    public void testStreamsGroupEvaluatedConfigs() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .buildCoordinatorMetadataImage())
+            
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 0)
+            .build();
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result =
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(10000)
+                    .setTopology(topology)
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of()));
+        assertEquals(2, result.response().data().memberEpoch());
+
+        // Verify default heartbeat interval, session timeout, and 
num.standby.replicas before config update.
+        
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
+            result.response().data().heartbeatIntervalMs());
+        context.assertSessionTimeout(groupId, memberId,
+            GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT);
+        assertEquals(Map.of("num.standby.replicas",
+                
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT)),
+            assignor.lastPassedAssignmentConfigs());
+
+        // Advance time.
+        assertEquals(
+            List.of(),
+            context.sleep(result.response().data().heartbeatIntervalMs())
+        );
+
+        // Dynamic update group config with out-of-range values.
+        // Session timeout 70000 exceeds max 60000; heartbeat interval 1 is 
below min 5000;
+        // num standby replicas 100 exceeds max 2.
+        Properties newGroupConfig = new Properties();
+        newGroupConfig.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 70000);
+        newGroupConfig.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 1);
+        newGroupConfig.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 100);
+        context.updateGroupConfig(groupId, newGroupConfig);
+
+        // Session timer is rescheduled on second heartbeat, new assignment 
with evaluated parameter is calculated.
+        result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(result.response().data().memberEpoch())
+                .setRackId("bla"));
+
+        // Verify heartbeat interval is evaluated to min.
+        
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+            result.response().data().heartbeatIntervalMs());
+
+        // Verify session timeout is evaluated to max.
+        context.assertSessionTimeout(groupId, memberId,
+            
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
+
+        // Verify that the number of standby replicas is evaluated to max.
+        assertEquals(Map.of("num.standby.replicas",
+                
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT)),
+            assignor.lastPassedAssignmentConfigs());
+    }
+
     @Test
     public void testReplayConsumerGroupMemberMetadata() {
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index 9d5fdf0ffca..50ceedfda81 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -16,14 +16,11 @@
  */
 package org.apache.kafka.coordinator.group.modern.share;
 
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -169,7 +166,6 @@ public class ShareGroupConfigTest {
     }
 
     private static ShareGroupConfig createConfig(Map<String, Object> configs) {
-        return new ShareGroupConfig(
-            new 
AbstractConfig(Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF, 
GroupCoordinatorConfig.CONFIG_DEF)), configs, false));
+        return ShareGroupConfig.fromProps(configs);
     }
 }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
index 7e782b292a3..f2f3b835c50 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
@@ -40,6 +40,7 @@ import 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
 import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
 import org.apache.kafka.coordinator.group.OffsetAndMetadata;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
+import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.storage.internals.log.FetchDataInfo;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
@@ -103,6 +104,7 @@ public class GroupCoordinatorShardLoadingBenchmark {
     private TopicPartition topicPartition;
     private MockTime time;
     private GroupCoordinatorConfig config;
+    private ShareGroupConfig shareGroupConfig;
     private GroupCoordinatorRecordSerde serde;
     private GroupCoordinatorShard coordinatorShard;
     private SnapshottableCoordinator<GroupCoordinatorShard, CoordinatorRecord> 
snapshottableCoordinator;
@@ -269,6 +271,7 @@ public class GroupCoordinatorShardLoadingBenchmark {
         time = new MockTime();
         Map<String, Object> props = new HashMap<>();
         config = GroupCoordinatorConfig.fromProps(props);
+        shareGroupConfig = ShareGroupConfig.fromProps(props);
         serde = new GroupCoordinatorRecordSerde();
     }
 
@@ -287,7 +290,7 @@ public class GroupCoordinatorShardLoadingBenchmark {
 
     @Setup(Level.Invocation)
     public void setupInvocation() {
-        GroupConfigManager configManager = new GroupConfigManager(new 
HashMap<>());
+        GroupConfigManager configManager = new GroupConfigManager(new 
HashMap<>(), config, shareGroupConfig);
         LogContext logContext = new LogContext();
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
 

Reply via email to