This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8337d8368c6 KAFKA-20300: Add broker configs for assignment batching 
and offload (#21730)
8337d8368c6 is described below

commit 8337d8368c6f2a12c1f0f6e9493c03efc735c3c0
Author: Sean Quah <[email protected]>
AuthorDate: Fri Mar 13 06:52:39 2026 +0000

    KAFKA-20300: Add broker configs for assignment batching and offload (#21730)
    
    Add group.{consumer,share,streams}.assignment.interval.ms config options
    to control the delay between assignment calculation. These config
    options are dynamic at the broker level.
    
    Add group.{consumer,share,streams}.{min,max}.assignment.interval.ms
    config options to limit the
    group.{consumer,share,streams}.assignment.interval.ms config values.
    
    Add group.{consumer,share,streams}.assignor.offload.enable config
    options to control whether assignment calculation is offloaded to a
    group coordinator background thread. These config options are dynamic at
    the broker level.
    
    Reviewers: David Jacot <[email protected]>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   1 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |   4 +
 .../kafka/server/DynamicBrokerConfigTest.scala     |  61 ++++-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   9 +
 .../coordinator/group/GroupCoordinatorConfig.java  | 258 ++++++++++++++++++++-
 .../coordinator/group/GroupMetadataManager.java    |  48 ++++
 .../group/GroupCoordinatorConfigTest.java          |  63 +++++
 7 files changed, 434 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 80c5a1235f0..0fd64ef350d 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -430,6 +430,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     newProps ++= staticBrokerConfigs
     overrideProps(newProps, dynamicDefaultConfigs)
     overrideProps(newProps, dynamicBrokerConfigs)
+    KafkaConfig.clampDynamicConfigs(newProps.asJava)
 
     val oldConfig = currentConfig
     val (newConfig, brokerReconfigurablesToUpdate) = 
processReconfiguration(newProps, validateOnly = false, doLog)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9081eb058cb..c4707b68ff2 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -70,6 +70,10 @@ object KafkaConfig {
   private[server] def defaultValues: Map[String, _] = 
configDef.defaultValues.asScala
   private[server] def configKeys: Map[String, ConfigKey] = 
configDef.configKeys.asScala
 
+  def clampDynamicConfigs(props: java.util.Map[String, String]): Unit = {
+    GroupCoordinatorConfig.clampDynamicConfigs(props)
+  }
+
   def fromProps(props: Properties): KafkaConfig =
     fromProps(props, true)
 
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 24ed3dedfd9..00a74855ec4 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -1180,20 +1180,65 @@ class DynamicBrokerConfigTest {
   }
 
   @Test
-  def testCoordinatorCachedBufferMaxBytesUpdates(): Unit = {
+  def testDynamicGroupCoordinatorConfig(): Unit = {
+    
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG))
+    
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
+    
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG))
+    
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
+    
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG))
+    
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
+    
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG))
+
     val origProps = TestUtils.createBrokerConfig(0, port = 8181)
     origProps.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 
"2097152")
-    origProps.put(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 
"3145728")
-    val ctx = new DynamicLogConfigContext(origProps)
-    assertEquals(2 * 1024 * 1024, 
ctx.config.groupCoordinatorConfig.cachedBufferMaxBytes())
-    assertEquals(3 * 1024 * 1024, 
ctx.config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
+    
origProps.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
 "500")
+    
origProps.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
 "false")
+    
origProps.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"250")
+    
origProps.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
 "false")
+    
origProps.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
 "125")
+    
origProps.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
 "false")
+    val config = KafkaConfig(origProps)
+    config.dynamicConfig.initialize(None)
+    assertEquals(2 * 1024 * 1024, 
config.groupCoordinatorConfig.cachedBufferMaxBytes())
+    assertEquals(500, 
config.groupCoordinatorConfig.consumerGroupAssignmentIntervalMs())
+    assertEquals(false, 
config.groupCoordinatorConfig.consumerGroupAssignorOffloadEnable())
+    assertEquals(250, 
config.groupCoordinatorConfig.shareGroupAssignmentIntervalMs())
+    assertEquals(false, 
config.groupCoordinatorConfig.shareGroupAssignorOffloadEnable())
+    assertEquals(125, 
config.groupCoordinatorConfig.streamsGroupAssignmentIntervalMs())
+    assertEquals(false, 
config.groupCoordinatorConfig.streamsGroupAssignorOffloadEnable())
 
     val props = new Properties()
     props.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, "4194304")
+    
props.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"1000")
+    
props.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"true")
+    
props.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"500")
+    
props.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"true")
+    
props.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"250")
+    
props.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"true")
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(4 * 1024 * 1024, 
config.groupCoordinatorConfig.cachedBufferMaxBytes())
+    assertEquals(1000, 
config.groupCoordinatorConfig.consumerGroupAssignmentIntervalMs())
+    assertEquals(true, 
config.groupCoordinatorConfig.consumerGroupAssignorOffloadEnable())
+    assertEquals(500, 
config.groupCoordinatorConfig.shareGroupAssignmentIntervalMs())
+    assertEquals(true, 
config.groupCoordinatorConfig.shareGroupAssignorOffloadEnable())
+    assertEquals(250, 
config.groupCoordinatorConfig.streamsGroupAssignmentIntervalMs())
+    assertEquals(true, 
config.groupCoordinatorConfig.streamsGroupAssignorOffloadEnable())
+  }
+
+  @Test
+  def testDynamicShareCoordinatorConfig(): Unit = {
+    
assertTrue(ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG))
+
+    val origProps = TestUtils.createBrokerConfig(0, port = 8181)
+    origProps.put(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 
"3145728")
+    val config = KafkaConfig(origProps)
+    config.dynamicConfig.initialize(None)
+    assertEquals(3 * 1024 * 1024, 
config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
+
+    val props = new Properties()
     props.put(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, "5242880")
-    ctx.config.dynamicConfig.updateDefaultConfig(props)
-    assertEquals(4 * 1024 * 1024, 
ctx.config.groupCoordinatorConfig.cachedBufferMaxBytes())
-    assertEquals(5 * 1024 * 1024, 
ctx.config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(5 * 1024 * 1024, 
config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
   }
 }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e726dbe3e72..b7465daf65c 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1049,6 +1049,9 @@ class KafkaConfigTest {
         case 
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG => // 
ignore string
+        case 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+        case 
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+        case 
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
 
         /** Share groups configs */
         case GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
@@ -1066,6 +1069,9 @@ class KafkaConfigTest {
         case 
ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG =>  
//ignore string
+        case GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+        case 
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+        case 
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
 
         /** Streams groups configs */
         case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
@@ -1077,6 +1083,9 @@ class KafkaConfigTest {
         case GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
         case GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+        case 
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+        case 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+        case 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
 
         /** Share coordinator configs */
         case ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index cc808999bc0..ff80a5c7ee6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -30,6 +30,9 @@ import 
org.apache.kafka.coordinator.group.assignor.SimpleAssignor;
 import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -47,6 +50,7 @@ import static 
org.apache.kafka.common.config.ConfigDef.Importance.LOW;
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
 import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
 import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
@@ -62,6 +66,9 @@ import static org.apache.kafka.common.utils.Utils.require;
  * Using local variable is advantageous as it avoids the overhead of 
repeatedly looking up these configurations in AbstractConfig.
  */
 public class GroupCoordinatorConfig {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GroupCoordinatorConfig.class);
+
     ///
     /// Group coordinator configs
     ///
@@ -228,6 +235,22 @@ public class GroupCoordinatorConfig {
         ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from 
consumer group to classic group is enabled, " +
         ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor 
downgrade is enabled.";
 
+    public static final String CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG = 
"group.consumer.assignment.interval.ms";
+    public static final String CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DOC = 
"The interval between assignment updates for a consumer group.";
+    public static final int CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT = 
1000;
+
+    public static final String 
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG = 
"group.consumer.min.assignment.interval.ms";
+    public static final String CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC = 
"The minimum interval between assignment updates for a consumer group.";
+    public static final int CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT 
= 0;
+
+    public static final String 
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG = 
"group.consumer.max.assignment.interval.ms";
+    public static final String CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC = 
"The maximum interval between assignment updates for a consumer group.";
+    public static final int CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT 
= 15000;
+
+    public static final String CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG = 
"group.consumer.assignor.offload.enable";
+    public static final String CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC = 
"Whether to offload consumer group assignment to a group coordinator background 
thread.";
+    public static final boolean CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT 
= true;
+
     public static final String CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG 
= "group.consumer.regex.refresh.interval.ms";
     public static final String CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC = 
"The interval at which the group coordinator will refresh " +
         "the topics matching the group subscribed regexes. This is only 
applicable to consumer groups using the consumer group protocol. ";
@@ -271,6 +294,22 @@ public class GroupCoordinatorConfig {
         SHARE_GROUP_BUILTIN_ASSIGNOR.name() + ".";
     public static final String SHARE_GROUP_ASSIGNORS_DEFAULT = 
SHARE_GROUP_BUILTIN_ASSIGNOR.name();
 
+    public static final String SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG = 
"group.share.assignment.interval.ms";
+    public static final String SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DOC = "The 
interval between assignment updates for a share group.";
+    public static final int SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT = 1000;
+
+    public static final String SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG = 
"group.share.min.assignment.interval.ms";
+    public static final String SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC = 
"The minimum interval between assignment updates for a share group.";
+    public static final int SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT = 0;
+
+    public static final String SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG = 
"group.share.max.assignment.interval.ms";
+    public static final String SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC = 
"The maximum interval between assignment updates for a share group.";
+    public static final int SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT = 
15000;
+
+    public static final String SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG = 
"group.share.assignor.offload.enable";
+    public static final String SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC = 
"Whether to offload share group assignment to a group coordinator background 
thread.";
+    public static final boolean SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT = 
true;
+
     public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG 
= "group.share.initialize.retry.interval.ms";
     // Because persister retries with exp backoff 5 times and upper cap of 30 
secs.
     public static final int SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT = 
30_000;
@@ -320,8 +359,30 @@ public class GroupCoordinatorConfig {
     public static final int STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 
3000;
     public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = 
"The amount of time the group coordinator will wait for more streams clients to 
join a new group before performing the first rebalance. A longer delay means 
potentially fewer rebalances.";
 
+    public static final String STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG = 
"group.streams.assignment.interval.ms";
+    public static final String STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC = "The 
interval between assignment updates for a streams group.";
+    public static final int STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT = 
1000;
+
+    public static final String STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG 
= "group.streams.min.assignment.interval.ms";
+    public static final String STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC = 
"The minimum interval between assignment updates for a streams group.";
+    public static final int STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT = 
0;
+
+    public static final String STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG 
= "group.streams.max.assignment.interval.ms";
+    public static final String STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC = 
"The maximum interval between assignment updates for a streams group.";
+    public static final int STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT = 
15000;
+
+    public static final String STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG = 
"group.streams.assignor.offload.enable";
+    public static final String STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC = 
"Whether to offload streams group assignment to a group coordinator background 
thread.";
+    public static final boolean STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT 
= true;
+
     public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
-        CACHED_BUFFER_MAX_BYTES_CONFIG
+        CACHED_BUFFER_MAX_BYTES_CONFIG,
+        CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+        CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+        SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+        SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+        STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+        STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG
     );
     
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
@@ -362,6 +423,10 @@ public class GroupCoordinatorConfig {
         .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, 
CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, 
CONSUMER_GROUP_MAX_SIZE_DOC)
         .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, 
CONSUMER_GROUP_ASSIGNORS_DEFAULT, 
ConfigDef.ValidList.anyNonDuplicateValues(false, false), MEDIUM, 
CONSUMER_GROUP_ASSIGNORS_DOC)
         .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, 
CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, 
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)),
 MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC)
+        .define(CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
+        .define(CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
+        .define(CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
+        .define(CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN, 
CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM, 
CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         // Interval config used for testing purposes.
         .defineInternal(CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, INT, 
CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DEFAULT, atLeast(10 * 1000), MEDIUM, 
CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC)
 
@@ -374,6 +439,10 @@ public class GroupCoordinatorConfig {
         .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
         .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, 
SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, 
SHARE_GROUP_MAX_SIZE_DOC)
         .define(SHARE_GROUP_ASSIGNORS_CONFIG, LIST, 
SHARE_GROUP_ASSIGNORS_DEFAULT, ConfigDef.ValidList.anyNonDuplicateValues(false, 
false), MEDIUM, SHARE_GROUP_ASSIGNORS_DOC)
+        .define(SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
+        .define(SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
+        .define(SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
+        .define(SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN, 
SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM, 
SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         .defineInternal(SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC)
 
         // Streams group configs
@@ -386,7 +455,11 @@ public class GroupCoordinatorConfig {
         .define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, 
STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC)
         .define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, 
STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
         .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, 
STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC)
-        .define(STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, 
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);
+        .define(STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, 
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
+        .define(STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
+        .define(STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
+        .define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
+        .define(STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN, 
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM, 
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC);
 
 
     /**
@@ -419,6 +492,8 @@ public class GroupCoordinatorConfig {
     private final int consumerGroupMaxSessionTimeoutMs;
     private final int consumerGroupMinHeartbeatIntervalMs;
     private final int consumerGroupMaxHeartbeatIntervalMs;
+    private final int consumerGroupMinAssignmentIntervalMs;
+    private final int consumerGroupMaxAssignmentIntervalMs;
     private final int consumerGroupRegexRefreshIntervalMs;
     // Share group configurations
     private final int shareGroupMaxSize;
@@ -429,6 +504,8 @@ public class GroupCoordinatorConfig {
     private final int shareGroupMinHeartbeatIntervalMs;
     private final int shareGroupMaxHeartbeatIntervalMs;
     private final List<ShareGroupPartitionAssignor> shareGroupAssignors;
+    private final int shareGroupMinAssignmentIntervalMs;
+    private final int shareGroupMaxAssignmentIntervalMs;
     private final int shareGroupInitializeRetryIntervalMs;
     // Streams group configurations
     private final int streamsGroupSessionTimeoutMs;
@@ -441,6 +518,8 @@ public class GroupCoordinatorConfig {
     private final int streamsGroupNumStandbyReplicas;
     private final int streamsGroupMaxStandbyReplicas;
     private final int streamsGroupInitialRebalanceDelayMs;
+    private final int streamsGroupMinAssignmentIntervalMs;
+    private final int streamsGroupMaxAssignmentIntervalMs;
 
     private final AbstractConfig config;
 
@@ -474,6 +553,8 @@ public class GroupCoordinatorConfig {
         this.consumerGroupMaxSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
         this.consumerGroupMinHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.consumerGroupMaxHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.consumerGroupMinAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
+        this.consumerGroupMaxAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
         this.consumerGroupRegexRefreshIntervalMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG);
         // Share group configurations
         this.shareGroupSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG);
@@ -485,6 +566,8 @@ public class GroupCoordinatorConfig {
         this.shareGroupMaxSize = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
         this.shareGroupAssignors = shareGroupAssignors(config);
         int initializeRetryMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG);
+        this.shareGroupMinAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
+        this.shareGroupMaxAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
         this.shareGroupInitializeRetryIntervalMs = Math.max(initializeRetryMs, 
this.offsetCommitTimeoutMs);
         // Streams group configurations
         this.streamsGroupSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG);
@@ -497,6 +580,8 @@ public class GroupCoordinatorConfig {
         this.streamsGroupNumStandbyReplicas = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
         this.streamsGroupMaxStandbyReplicas = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
         this.streamsGroupInitialRebalanceDelayMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+        this.streamsGroupMinAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
+        this.streamsGroupMaxAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
         this.config = config;
 
         // New group coordinator configs validation.
@@ -515,6 +600,14 @@ public class GroupCoordinatorConfig {
                 String.format("%s must be less than or equal to %s", 
CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
         require(consumerGroupHeartbeatIntervalMs < 
consumerGroupSessionTimeoutMs,
                 String.format("%s must be less than %s", 
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG));
+
+        require(consumerGroupMaxAssignmentIntervalMs >= 
consumerGroupMinAssignmentIntervalMs,
+                String.format("%s must be greater than or equal to %s", 
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, 
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG));
+        require(consumerGroupAssignmentIntervalMs() >= 
consumerGroupMinAssignmentIntervalMs,
+                String.format("%s must be greater than or equal to %s", 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG));
+        require(consumerGroupAssignmentIntervalMs() <= 
consumerGroupMaxAssignmentIntervalMs,
+                String.format("%s must be less than or equal to %s", 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+
         // Share group configs validation.
         require(shareGroupMaxHeartbeatIntervalMs >= 
shareGroupMinHeartbeatIntervalMs,
             String.format("%s must be greater than or equal to %s",
@@ -541,6 +634,17 @@ public class GroupCoordinatorConfig {
                 SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
         require(shareGroupAssignors.size() == 1,
             String.format("%s must contain exactly one assignor, but found 
%d", SHARE_GROUP_ASSIGNORS_CONFIG, shareGroupAssignors.size()));
+
+        require(shareGroupMaxAssignmentIntervalMs >= 
shareGroupMinAssignmentIntervalMs,
+            String.format("%s must be greater than or equal to %s",
+                SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, 
SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG));
+        require(shareGroupAssignmentIntervalMs() >= 
shareGroupMinAssignmentIntervalMs,
+            String.format("%s must be greater than or equal to %s",
+                SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG));
+        require(shareGroupAssignmentIntervalMs() <= 
shareGroupMaxAssignmentIntervalMs,
+            String.format("%s must be less than or equal to %s",
+                SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+
         // Streams group configs validation.
         require(streamsGroupMaxHeartbeatIntervalMs >= 
streamsGroupMinHeartbeatIntervalMs,
             String.format("%s must be greater than or equal to %s",
@@ -562,6 +666,72 @@ public class GroupCoordinatorConfig {
         require(streamsGroupHeartbeatIntervalMs < streamsGroupSessionTimeoutMs,
             String.format("%s must be less than %s",
                 STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG));
+
+        require(streamsGroupMaxAssignmentIntervalMs >= 
streamsGroupMinAssignmentIntervalMs,
+            String.format("%s must be greater than or equal to %s",
+                STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG));
+        require(streamsGroupAssignmentIntervalMs() >= 
streamsGroupMinAssignmentIntervalMs,
+            String.format("%s must be greater than or equal to %s",
+                STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG));
+        require(streamsGroupAssignmentIntervalMs() <= 
streamsGroupMaxAssignmentIntervalMs,
+            String.format("%s must be less than or equal to %s",
+                STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+    }
+
+    /**
+     * Clamps dynamic group coordinator configs to within acceptable bounds.
+     * Out-of-range values are capped and a WARN log is emitted.
+     *
+     * @param props The full Kafka config containing properties to be clamped.
+     */
+    public static void clampDynamicConfigs(Map<String, String> props) {
+        // Parse configs but do not validate mins and maxes.
+        AbstractConfig groupCoordinatorConfig = new AbstractConfig(
+            GroupCoordinatorConfig.CONFIG_DEF,
+            props
+        );
+
+        clampDynamicIntConfig(props, 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            
groupCoordinatorConfig.getInt(CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG),
+            
groupCoordinatorConfig.getInt(CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+        clampDynamicIntConfig(props, SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            
groupCoordinatorConfig.getInt(SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG),
+            
groupCoordinatorConfig.getInt(SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+        clampDynamicIntConfig(props, 
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            
groupCoordinatorConfig.getInt(STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG),
+            
groupCoordinatorConfig.getInt(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+    }
+
+    /**
+     * Clamp a config value to [min, max]. A WARN log is emitted on adjustment.
+     * No-op when the key is absent from props.
+     *
+     * @param props   The properties to modify in place.
+     * @param key     The config key.
+     * @param min     The minimum allowed value (inclusive).
+     * @param max     The maximum allowed value (inclusive).
+     */
+    private static void clampDynamicIntConfig(
+        Map<String, String> props,
+        String key,
+        int min,
+        int max
+    ) {
+        Object rawValue = props.get(key);
+        if (rawValue == null) return;
+
+        int value = Integer.parseInt(rawValue.toString());
+        if (value < min) {
+            LOG.warn("The config '{}' has value {} which is below the " +
+                    "allowed minimum {}. The effective value will be capped to 
{}.",
+                key, value, min, min);
+            props.put(key, String.valueOf(min));
+        } else if (value > max) {
+            LOG.warn("The config '{}' has value {} which exceeds the " +
+                    "allowed maximum {}. The effective value will be capped to 
{}.",
+                key, value, max, max);
+            props.put(key, String.valueOf(max));
+        }
     }
 
     public static GroupCoordinatorConfig fromProps(
@@ -918,6 +1088,34 @@ public class GroupCoordinatorConfig {
         return consumerGroupMaxHeartbeatIntervalMs;
     }
 
+    /**
+     * The interval between assignment updates for a consumer group.
+     */
+    public int consumerGroupAssignmentIntervalMs() {
+        return 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG);
+    }
+
+    /**
+     * The minimum interval between assignment updates for a consumer group.
+     */
+    public int consumerGroupMinAssignmentIntervalMs() {
+        return consumerGroupMinAssignmentIntervalMs;
+    }
+
+    /**
+     * The maximum interval between assignment updates for a consumer group.
+     */
+    public int consumerGroupMaxAssignmentIntervalMs() {
+        return consumerGroupMaxAssignmentIntervalMs;
+    }
+
+    /**
+     * Whether to offload consumer group assignment to a group coordinator 
background thread.
+     */
+    public boolean consumerGroupAssignorOffloadEnable() {
+        return 
config.getBoolean(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+    }
+
     /**
      * The consumer group regex batch refresh max interval in milliseconds.
      */
@@ -981,6 +1179,34 @@ public class GroupCoordinatorConfig {
         return shareGroupAssignors;
     }
 
+    /**
+     * The interval between assignment updates for a share group.
+     */
+    public int shareGroupAssignmentIntervalMs() {
+        return 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG);
+    }
+
+    /**
+     * The minimum interval between assignment updates for a share group.
+     */
+    public int shareGroupMinAssignmentIntervalMs() {
+        return shareGroupMinAssignmentIntervalMs;
+    }
+
+    /**
+     * The maximum interval between assignment updates for a share group.
+     */
+    public int shareGroupMaxAssignmentIntervalMs() {
+        return shareGroupMaxAssignmentIntervalMs;
+    }
+
+    /**
+     * Whether to offload share group assignment to a group coordinator 
background thread.
+     */
+    public boolean shareGroupAssignorOffloadEnable() {
+        return 
config.getBoolean(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+    }
+
     /**
      * The share group initialize retry interval.
      */
@@ -1057,4 +1283,32 @@ public class GroupCoordinatorConfig {
     public int streamsGroupInitialRebalanceDelayMs() {
         return streamsGroupInitialRebalanceDelayMs;
     }
+
+    /**
+     * The interval between assignment updates for a streams group.
+     */
+    public int streamsGroupAssignmentIntervalMs() {
+        return 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG);
+    }
+
+    /**
+     * The minimum interval between assignment updates for a streams group.
+     */
+    public int streamsGroupMinAssignmentIntervalMs() {
+        return streamsGroupMinAssignmentIntervalMs;
+    }
+
+    /**
+     * The maximum interval between assignment updates for a streams group.
+     */
+    public int streamsGroupMaxAssignmentIntervalMs() {
+        return streamsGroupMaxAssignmentIntervalMs;
+    }
+
+    /**
+     * Whether to offload streams group assignment to a group coordinator 
background thread.
+     */
+    public boolean streamsGroupAssignorOffloadEnable() {
+        return 
config.getBoolean(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index ffcd909a2b9..4141e441aee 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -8697,6 +8697,22 @@ public class GroupMetadataManager {
             .orElse(config.consumerGroupHeartbeatIntervalMs());
     }
 
+    /**
+     * Get the interval between assignment updates of the provided consumer 
group.
+     */
+    // package private for testing
+    int consumerGroupAssignmentIntervalMs(String groupId) {
+        return config.consumerGroupAssignmentIntervalMs();
+    }
+
+    /**
+     * Get whether to offload assignment for the provided consumer group.
+     */
+    // package private for testing
+    boolean consumerGroupAssignorOffloadEnable(String groupId) {
+        return config.consumerGroupAssignorOffloadEnable();
+    }
+
     /**
      * Get the session timeout of the provided share group.
      */
@@ -8715,6 +8731,22 @@ public class GroupMetadataManager {
             .orElse(config.shareGroupHeartbeatIntervalMs());
     }
 
+    /**
+     * Get the interval between assignment updates of the provided share group.
+     */
+    // package private for testing
+    int shareGroupAssignmentIntervalMs(String groupId) {
+        return config.shareGroupAssignmentIntervalMs();
+    }
+
+    /**
+     * Get whether to offload assignment for the provided share group.
+     */
+    // package private for testing
+    boolean shareGroupAssignorOffloadEnable(String groupId) {
+        return config.shareGroupAssignorOffloadEnable();
+    }
+
     /**
      * Get the session timeout of the provided streams group.
      */
@@ -8733,6 +8765,22 @@ public class GroupMetadataManager {
             .orElse(config.streamsGroupHeartbeatIntervalMs());
     }
 
+    /**
+     * Get the interval between assignment updates of the provided streams 
group.
+     */
+    // package private for testing
+    int streamsGroupAssignmentIntervalMs(String groupId) {
+        return config.streamsGroupAssignmentIntervalMs();
+    }
+
+    /**
+     * Get whether to offload assignment for the provided streams group.
+     */
+    // package private for testing
+    boolean streamsGroupAssignorOffloadEnable(String groupId) {
+        return config.streamsGroupAssignorOffloadEnable();
+    }
+
     /**
      * Get the initial rebalance delay of the provided streams group.
      */
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 0208e627d6a..7319fcebdf4 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -199,8 +199,20 @@ public class GroupCoordinatorConfigTest {
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
 666);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 111);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 222);
+        
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
 500);
+        
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
 400);
+        
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
 600);
+        
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
 false);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
 15 * 60 * 1000);
+        
configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
250);
+        
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
 150);
+        
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
 350);
+        
configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
false);
         
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 5000);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
125);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
 25);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
 225);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
 false);
         configs.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 2 * 
1024 * 1024);
 
         GroupCoordinatorConfig config = createConfig(configs);
@@ -231,8 +243,20 @@ public class GroupCoordinatorConfigTest {
         assertEquals(666, config.consumerGroupMaxSessionTimeoutMs());
         assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs());
         assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs());
+        assertEquals(500, config.consumerGroupAssignmentIntervalMs());
+        assertEquals(400, config.consumerGroupMinAssignmentIntervalMs());
+        assertEquals(600, config.consumerGroupMaxAssignmentIntervalMs());
+        assertEquals(false, config.consumerGroupAssignorOffloadEnable());
         assertEquals(15 * 60 * 1000, 
config.consumerGroupRegexRefreshIntervalMs());
+        assertEquals(250, config.shareGroupAssignmentIntervalMs());
+        assertEquals(150, config.shareGroupMinAssignmentIntervalMs());
+        assertEquals(350, config.shareGroupMaxAssignmentIntervalMs());
+        assertEquals(false, config.shareGroupAssignorOffloadEnable());
         assertEquals(5000, config.streamsGroupInitialRebalanceDelayMs());
+        assertEquals(125, config.streamsGroupAssignmentIntervalMs());
+        assertEquals(25, config.streamsGroupMinAssignmentIntervalMs());
+        assertEquals(225, config.streamsGroupMaxAssignmentIntervalMs());
+        assertEquals(false, config.streamsGroupAssignorOffloadEnable());
         assertEquals(2 * 1024 * 1024, config.cachedBufferMaxBytes());
     }
 
@@ -337,6 +361,45 @@ public class GroupCoordinatorConfigTest {
             assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
     }
 
+    @Test
+    public void testClampDynamicConfigs() {
+        Map<String, String> consumerProps = Map.of(
+            
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"30000",
+            
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, "60000"
+        );
+        testClampDynamicConfig(consumerProps, 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "30000", 
"15000");
+        testClampDynamicConfig(consumerProps, 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "45000", 
"45000");
+        testClampDynamicConfig(consumerProps, 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "60000", 
"90000");
+
+        Map<String, String> shareProps = Map.of(
+            
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, "30000",
+            
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, "60000"
+        );
+        testClampDynamicConfig(shareProps, 
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "30000", 
"15000");
+        testClampDynamicConfig(shareProps, 
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "45000", 
"45000");
+        testClampDynamicConfig(shareProps, 
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "60000", 
"90000");
+
+        Map<String, String> streamsProps = Map.of(
+            
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, "30000",
+            
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, "60000"
+        );
+        testClampDynamicConfig(streamsProps, 
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "30000", 
"15000");
+        testClampDynamicConfig(streamsProps, 
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "45000", 
"45000");
+        testClampDynamicConfig(streamsProps, 
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "60000", 
"90000");
+    }
+
+    private void testClampDynamicConfig(
+        Map<String, String> props,
+        String configName,
+        String expectedValue,
+        String value
+    ) {
+        props = new HashMap<>(props);
+        props.put(configName, value);
+        GroupCoordinatorConfig.clampDynamicConfigs(props);
+        assertEquals(expectedValue, props.get(configName));
+    }
+
     @Test
     public void testAppendLingerMs() {
         GroupCoordinatorConfig config = 
createConfig(Map.of(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG,
 -1));


Reply via email to