This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8337d8368c6 KAFKA-20300: Add broker configs for assignment batching
and offload (#21730)
8337d8368c6 is described below
commit 8337d8368c6f2a12c1f0f6e9493c03efc735c3c0
Author: Sean Quah <[email protected]>
AuthorDate: Fri Mar 13 06:52:39 2026 +0000
KAFKA-20300: Add broker configs for assignment batching and offload (#21730)
Add group.{consumer,share,streams}.assignment.interval.ms config options
to control the delay between assignment calculation. These config
options are dynamic at the broker level.
Add group.{consumer,share,streams}.{min,max}.assignment.interval.ms
config options to limit the
group.{consumer,share,streams}.assignment.interval.ms config values.
Add group.{consumer,share,streams}.assignor.offload.enable config
options to control whether assignment calculation is offloaded to a
group coordinator background thread. These config options are dynamic at
the broker level.
Reviewers: David Jacot <[email protected]>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 1 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 4 +
.../kafka/server/DynamicBrokerConfigTest.scala | 61 ++++-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 9 +
.../coordinator/group/GroupCoordinatorConfig.java | 258 ++++++++++++++++++++-
.../coordinator/group/GroupMetadataManager.java | 48 ++++
.../group/GroupCoordinatorConfigTest.java | 63 +++++
7 files changed, 434 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 80c5a1235f0..0fd64ef350d 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -430,6 +430,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
newProps ++= staticBrokerConfigs
overrideProps(newProps, dynamicDefaultConfigs)
overrideProps(newProps, dynamicBrokerConfigs)
+ KafkaConfig.clampDynamicConfigs(newProps.asJava)
val oldConfig = currentConfig
val (newConfig, brokerReconfigurablesToUpdate) =
processReconfiguration(newProps, validateOnly = false, doLog)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9081eb058cb..c4707b68ff2 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -70,6 +70,10 @@ object KafkaConfig {
private[server] def defaultValues: Map[String, _] =
configDef.defaultValues.asScala
private[server] def configKeys: Map[String, ConfigKey] =
configDef.configKeys.asScala
+ def clampDynamicConfigs(props: java.util.Map[String, String]): Unit = {
+ GroupCoordinatorConfig.clampDynamicConfigs(props)
+ }
+
def fromProps(props: Properties): KafkaConfig =
fromProps(props, true)
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 24ed3dedfd9..00a74855ec4 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -1180,20 +1180,65 @@ class DynamicBrokerConfigTest {
}
@Test
- def testCoordinatorCachedBufferMaxBytesUpdates(): Unit = {
+ def testDynamicGroupCoordinatorConfig(): Unit = {
+
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG))
+
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
+
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG))
+
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
+
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG))
+
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
+
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG))
+
val origProps = TestUtils.createBrokerConfig(0, port = 8181)
origProps.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG,
"2097152")
- origProps.put(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG,
"3145728")
- val ctx = new DynamicLogConfigContext(origProps)
- assertEquals(2 * 1024 * 1024,
ctx.config.groupCoordinatorConfig.cachedBufferMaxBytes())
- assertEquals(3 * 1024 * 1024,
ctx.config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
+
origProps.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"500")
+
origProps.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false")
+
origProps.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"250")
+
origProps.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false")
+
origProps.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"125")
+
origProps.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false")
+ val config = KafkaConfig(origProps)
+ config.dynamicConfig.initialize(None)
+ assertEquals(2 * 1024 * 1024,
config.groupCoordinatorConfig.cachedBufferMaxBytes())
+ assertEquals(500,
config.groupCoordinatorConfig.consumerGroupAssignmentIntervalMs())
+ assertEquals(false,
config.groupCoordinatorConfig.consumerGroupAssignorOffloadEnable())
+ assertEquals(250,
config.groupCoordinatorConfig.shareGroupAssignmentIntervalMs())
+ assertEquals(false,
config.groupCoordinatorConfig.shareGroupAssignorOffloadEnable())
+ assertEquals(125,
config.groupCoordinatorConfig.streamsGroupAssignmentIntervalMs())
+ assertEquals(false,
config.groupCoordinatorConfig.streamsGroupAssignorOffloadEnable())
val props = new Properties()
props.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, "4194304")
+
props.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"1000")
+
props.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"true")
+
props.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"500")
+
props.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"true")
+
props.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"250")
+
props.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"true")
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(4 * 1024 * 1024,
config.groupCoordinatorConfig.cachedBufferMaxBytes())
+ assertEquals(1000,
config.groupCoordinatorConfig.consumerGroupAssignmentIntervalMs())
+ assertEquals(true,
config.groupCoordinatorConfig.consumerGroupAssignorOffloadEnable())
+ assertEquals(500,
config.groupCoordinatorConfig.shareGroupAssignmentIntervalMs())
+ assertEquals(true,
config.groupCoordinatorConfig.shareGroupAssignorOffloadEnable())
+ assertEquals(250,
config.groupCoordinatorConfig.streamsGroupAssignmentIntervalMs())
+ assertEquals(true,
config.groupCoordinatorConfig.streamsGroupAssignorOffloadEnable())
+ }
+
+ @Test
+ def testDynamicShareCoordinatorConfig(): Unit = {
+
assertTrue(ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG))
+
+ val origProps = TestUtils.createBrokerConfig(0, port = 8181)
+ origProps.put(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG,
"3145728")
+ val config = KafkaConfig(origProps)
+ config.dynamicConfig.initialize(None)
+ assertEquals(3 * 1024 * 1024,
config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
+
+ val props = new Properties()
props.put(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, "5242880")
- ctx.config.dynamicConfig.updateDefaultConfig(props)
- assertEquals(4 * 1024 * 1024,
ctx.config.groupCoordinatorConfig.cachedBufferMaxBytes())
- assertEquals(5 * 1024 * 1024,
ctx.config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(5 * 1024 * 1024,
config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e726dbe3e72..b7465daf65c 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1049,6 +1049,9 @@ class KafkaConfigTest {
case
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG => //
ignore string
+ case
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
/** Share groups configs */
case GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
@@ -1066,6 +1069,9 @@ class KafkaConfigTest {
case
ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG =>
//ignore string
+ case GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
/** Streams groups configs */
case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
@@ -1077,6 +1083,9 @@ class KafkaConfigTest {
case GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
/** Share coordinator configs */
case ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index cc808999bc0..ff80a5c7ee6 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -30,6 +30,9 @@ import
org.apache.kafka.coordinator.group.assignor.SimpleAssignor;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -47,6 +50,7 @@ import static
org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
@@ -62,6 +66,9 @@ import static org.apache.kafka.common.utils.Utils.require;
* Using local variable is advantageous as it avoids the overhead of
repeatedly looking up these configurations in AbstractConfig.
*/
public class GroupCoordinatorConfig {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GroupCoordinatorConfig.class);
+
///
/// Group coordinator configs
///
@@ -228,6 +235,22 @@ public class GroupCoordinatorConfig {
ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from
consumer group to classic group is enabled, " +
ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor
downgrade is enabled.";
+ public static final String CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG =
"group.consumer.assignment.interval.ms";
+ public static final String CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DOC =
"The interval between assignment updates for a consumer group.";
+ public static final int CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT =
1000;
+
+ public static final String
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG =
"group.consumer.min.assignment.interval.ms";
+ public static final String CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC =
"The minimum interval between assignment updates for a consumer group.";
+ public static final int CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT
= 0;
+
+ public static final String
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG =
"group.consumer.max.assignment.interval.ms";
+ public static final String CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC =
"The maximum interval between assignment updates for a consumer group.";
+ public static final int CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT
= 15000;
+
+ public static final String CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG =
"group.consumer.assignor.offload.enable";
+ public static final String CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC =
"Whether to offload consumer group assignment to a group coordinator background
thread.";
+ public static final boolean CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT
= true;
+
public static final String CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG
= "group.consumer.regex.refresh.interval.ms";
public static final String CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC =
"The interval at which the group coordinator will refresh " +
"the topics matching the group subscribed regexes. This is only
applicable to consumer groups using the consumer group protocol. ";
@@ -271,6 +294,22 @@ public class GroupCoordinatorConfig {
SHARE_GROUP_BUILTIN_ASSIGNOR.name() + ".";
public static final String SHARE_GROUP_ASSIGNORS_DEFAULT =
SHARE_GROUP_BUILTIN_ASSIGNOR.name();
+ public static final String SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG =
"group.share.assignment.interval.ms";
+ public static final String SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DOC = "The
interval between assignment updates for a share group.";
+ public static final int SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT = 1000;
+
+ public static final String SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG =
"group.share.min.assignment.interval.ms";
+ public static final String SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC =
"The minimum interval between assignment updates for a share group.";
+ public static final int SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT = 0;
+
+ public static final String SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG =
"group.share.max.assignment.interval.ms";
+ public static final String SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC =
"The maximum interval between assignment updates for a share group.";
+ public static final int SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT =
15000;
+
+ public static final String SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG =
"group.share.assignor.offload.enable";
+ public static final String SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC =
"Whether to offload share group assignment to a group coordinator background
thread.";
+ public static final boolean SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT =
true;
+
public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG
= "group.share.initialize.retry.interval.ms";
// Because persister retries with exp backoff 5 times and upper cap of 30
secs.
public static final int SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT =
30_000;
@@ -320,8 +359,30 @@ public class GroupCoordinatorConfig {
public static final int STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT =
3000;
public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC =
"The amount of time the group coordinator will wait for more streams clients to
join a new group before performing the first rebalance. A longer delay means
potentially fewer rebalances.";
+ public static final String STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG =
"group.streams.assignment.interval.ms";
+ public static final String STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC = "The
interval between assignment updates for a streams group.";
+ public static final int STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT =
1000;
+
+ public static final String STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG
= "group.streams.min.assignment.interval.ms";
+ public static final String STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC =
"The minimum interval between assignment updates for a streams group.";
+ public static final int STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT =
0;
+
+ public static final String STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
= "group.streams.max.assignment.interval.ms";
+ public static final String STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC =
"The maximum interval between assignment updates for a streams group.";
+ public static final int STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT =
15000;
+
+ public static final String STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG =
"group.streams.assignor.offload.enable";
+ public static final String STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC =
"Whether to offload streams group assignment to a group coordinator background
thread.";
+ public static final boolean STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT
= true;
+
public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
- CACHED_BUFFER_MAX_BYTES_CONFIG
+ CACHED_BUFFER_MAX_BYTES_CONFIG,
+ CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+ SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+ STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG
);
public static final ConfigDef CONFIG_DEF = new ConfigDef()
@@ -362,6 +423,10 @@ public class GroupCoordinatorConfig {
.define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT,
CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MAX_SIZE_DOC)
.define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST,
CONSUMER_GROUP_ASSIGNORS_DEFAULT,
ConfigDef.ValidList.anyNonDuplicateValues(false, false), MEDIUM,
CONSUMER_GROUP_ASSIGNORS_DOC)
.define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING,
CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)),
MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC)
+ .define(CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
+ .define(CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
+ .define(CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
+ .define(CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN,
CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM,
CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
// Interval config used for testing purposes.
.defineInternal(CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DEFAULT, atLeast(10 * 1000), MEDIUM,
CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC)
@@ -374,6 +439,10 @@ public class GroupCoordinatorConfig {
.define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_SIZE_CONFIG, INT,
SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM,
SHARE_GROUP_MAX_SIZE_DOC)
.define(SHARE_GROUP_ASSIGNORS_CONFIG, LIST,
SHARE_GROUP_ASSIGNORS_DEFAULT, ConfigDef.ValidList.anyNonDuplicateValues(false,
false), MEDIUM, SHARE_GROUP_ASSIGNORS_DOC)
+ .define(SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
+ .define(SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
+ .define(SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
+ .define(SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN,
SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM,
SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
.defineInternal(SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC)
// Streams group configs
@@ -386,7 +455,11 @@ public class GroupCoordinatorConfig {
.define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT,
STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC)
.define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT,
STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
.define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT,
STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC)
- .define(STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT,
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);
+ .define(STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT,
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
+ .define(STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
+ .define(STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
+ .define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
+ .define(STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN,
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM,
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC);
/**
@@ -419,6 +492,8 @@ public class GroupCoordinatorConfig {
private final int consumerGroupMaxSessionTimeoutMs;
private final int consumerGroupMinHeartbeatIntervalMs;
private final int consumerGroupMaxHeartbeatIntervalMs;
+ private final int consumerGroupMinAssignmentIntervalMs;
+ private final int consumerGroupMaxAssignmentIntervalMs;
private final int consumerGroupRegexRefreshIntervalMs;
// Share group configurations
private final int shareGroupMaxSize;
@@ -429,6 +504,8 @@ public class GroupCoordinatorConfig {
private final int shareGroupMinHeartbeatIntervalMs;
private final int shareGroupMaxHeartbeatIntervalMs;
private final List<ShareGroupPartitionAssignor> shareGroupAssignors;
+ private final int shareGroupMinAssignmentIntervalMs;
+ private final int shareGroupMaxAssignmentIntervalMs;
private final int shareGroupInitializeRetryIntervalMs;
// Streams group configurations
private final int streamsGroupSessionTimeoutMs;
@@ -441,6 +518,8 @@ public class GroupCoordinatorConfig {
private final int streamsGroupNumStandbyReplicas;
private final int streamsGroupMaxStandbyReplicas;
private final int streamsGroupInitialRebalanceDelayMs;
+ private final int streamsGroupMinAssignmentIntervalMs;
+ private final int streamsGroupMaxAssignmentIntervalMs;
private final AbstractConfig config;
@@ -474,6 +553,8 @@ public class GroupCoordinatorConfig {
this.consumerGroupMaxSessionTimeoutMs =
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupMinHeartbeatIntervalMs =
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
this.consumerGroupMaxHeartbeatIntervalMs =
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
+ this.consumerGroupMinAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ this.consumerGroupMaxAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
this.consumerGroupRegexRefreshIntervalMs =
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG);
// Share group configurations
this.shareGroupSessionTimeoutMs =
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG);
@@ -485,6 +566,8 @@ public class GroupCoordinatorConfig {
this.shareGroupMaxSize =
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
this.shareGroupAssignors = shareGroupAssignors(config);
int initializeRetryMs =
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG);
+ this.shareGroupMinAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ this.shareGroupMaxAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
this.shareGroupInitializeRetryIntervalMs = Math.max(initializeRetryMs,
this.offsetCommitTimeoutMs);
// Streams group configurations
this.streamsGroupSessionTimeoutMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG);
@@ -497,6 +580,8 @@ public class GroupCoordinatorConfig {
this.streamsGroupNumStandbyReplicas =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsGroupMaxStandbyReplicas =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
this.streamsGroupInitialRebalanceDelayMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+ this.streamsGroupMinAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ this.streamsGroupMaxAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
this.config = config;
// New group coordinator configs validation.
@@ -515,6 +600,14 @@ public class GroupCoordinatorConfig {
String.format("%s must be less than or equal to %s",
CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG,
CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
require(consumerGroupHeartbeatIntervalMs <
consumerGroupSessionTimeoutMs,
String.format("%s must be less than %s",
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG));
+
+ require(consumerGroupMaxAssignmentIntervalMs >=
consumerGroupMinAssignmentIntervalMs,
+ String.format("%s must be greater than or equal to %s",
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ require(consumerGroupAssignmentIntervalMs() >=
consumerGroupMinAssignmentIntervalMs,
+ String.format("%s must be greater than or equal to %s",
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ require(consumerGroupAssignmentIntervalMs() <=
consumerGroupMaxAssignmentIntervalMs,
+ String.format("%s must be less than or equal to %s",
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+
// Share group configs validation.
require(shareGroupMaxHeartbeatIntervalMs >=
shareGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equal to %s",
@@ -541,6 +634,17 @@ public class GroupCoordinatorConfig {
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
require(shareGroupAssignors.size() == 1,
String.format("%s must contain exactly one assignor, but found
%d", SHARE_GROUP_ASSIGNORS_CONFIG, shareGroupAssignors.size()));
+
+ require(shareGroupMaxAssignmentIntervalMs >=
shareGroupMinAssignmentIntervalMs,
+ String.format("%s must be greater than or equal to %s",
+ SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ require(shareGroupAssignmentIntervalMs() >=
shareGroupMinAssignmentIntervalMs,
+ String.format("%s must be greater than or equal to %s",
+ SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ require(shareGroupAssignmentIntervalMs() <=
shareGroupMaxAssignmentIntervalMs,
+ String.format("%s must be less than or equal to %s",
+ SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+
// Streams group configs validation.
require(streamsGroupMaxHeartbeatIntervalMs >=
streamsGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equal to %s",
@@ -562,6 +666,72 @@ public class GroupCoordinatorConfig {
require(streamsGroupHeartbeatIntervalMs < streamsGroupSessionTimeoutMs,
String.format("%s must be less than %s",
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG));
+
+ require(streamsGroupMaxAssignmentIntervalMs >=
streamsGroupMinAssignmentIntervalMs,
+ String.format("%s must be greater than or equal to %s",
+ STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ require(streamsGroupAssignmentIntervalMs() >=
streamsGroupMinAssignmentIntervalMs,
+ String.format("%s must be greater than or equal to %s",
+ STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ require(streamsGroupAssignmentIntervalMs() <=
streamsGroupMaxAssignmentIntervalMs,
+ String.format("%s must be less than or equal to %s",
+ STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ }
+
+ /**
+ * Clamps dynamic group coordinator configs to within acceptable bounds.
+ * Out-of-range values are capped and a WARN log is emitted.
+ *
+ * @param props The full Kafka config containing properties to be clamped.
+ */
+ public static void clampDynamicConfigs(Map<String, String> props) {
+ // Parse configs but do not validate mins and maxes.
+ AbstractConfig groupCoordinatorConfig = new AbstractConfig(
+ GroupCoordinatorConfig.CONFIG_DEF,
+ props
+ );
+
+ clampDynamicIntConfig(props,
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+
groupCoordinatorConfig.getInt(CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG),
+
groupCoordinatorConfig.getInt(CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ clampDynamicIntConfig(props, SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+
groupCoordinatorConfig.getInt(SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG),
+
groupCoordinatorConfig.getInt(SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ clampDynamicIntConfig(props,
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+
groupCoordinatorConfig.getInt(STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG),
+
groupCoordinatorConfig.getInt(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ }
+
+ /**
+ * Clamp a config value to [min, max]. A WARN log is emitted on adjustment.
+ * No-op when the key is absent from props.
+ *
+ * @param props The properties to modify in place.
+ * @param key The config key.
+ * @param min The minimum allowed value (inclusive).
+ * @param max The maximum allowed value (inclusive).
+ */
+ private static void clampDynamicIntConfig(
+ Map<String, String> props,
+ String key,
+ int min,
+ int max
+ ) {
+ Object rawValue = props.get(key);
+ if (rawValue == null) return;
+
+ int value = Integer.parseInt(rawValue.toString());
+ if (value < min) {
+ LOG.warn("The config '{}' has value {} which is below the " +
+ "allowed minimum {}. The effective value will be capped to
{}.",
+ key, value, min, min);
+ props.put(key, String.valueOf(min));
+ } else if (value > max) {
+ LOG.warn("The config '{}' has value {} which exceeds the " +
+ "allowed maximum {}. The effective value will be capped to
{}.",
+ key, value, max, max);
+ props.put(key, String.valueOf(max));
+ }
}
public static GroupCoordinatorConfig fromProps(
@@ -918,6 +1088,34 @@ public class GroupCoordinatorConfig {
return consumerGroupMaxHeartbeatIntervalMs;
}
+ /**
+ * The interval between assignment updates for a consumer group.
+ */
+ public int consumerGroupAssignmentIntervalMs() {
+ return
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ }
+
+ /**
+ * The minimum interval between assignment updates for a consumer group.
+ */
+ public int consumerGroupMinAssignmentIntervalMs() {
+ return consumerGroupMinAssignmentIntervalMs;
+ }
+
+ /**
+ * The maximum interval between assignment updates for a consumer group.
+ */
+ public int consumerGroupMaxAssignmentIntervalMs() {
+ return consumerGroupMaxAssignmentIntervalMs;
+ }
+
+ /**
+ * Whether to offload consumer group assignment to a group coordinator
background thread.
+ */
+ public boolean consumerGroupAssignorOffloadEnable() {
+ return
config.getBoolean(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+ }
+
/**
* The consumer group regex batch refresh max interval in milliseconds.
*/
@@ -981,6 +1179,34 @@ public class GroupCoordinatorConfig {
return shareGroupAssignors;
}
+ /**
+ * The interval between assignment updates for a share group.
+ */
+ public int shareGroupAssignmentIntervalMs() {
+ return
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ }
+
+ /**
+ * The minimum interval between assignment updates for a share group.
+ */
+ public int shareGroupMinAssignmentIntervalMs() {
+ return shareGroupMinAssignmentIntervalMs;
+ }
+
+ /**
+ * The maximum interval between assignment updates for a share group.
+ */
+ public int shareGroupMaxAssignmentIntervalMs() {
+ return shareGroupMaxAssignmentIntervalMs;
+ }
+
+ /**
+ * Whether to offload share group assignment to a group coordinator
background thread.
+ */
+ public boolean shareGroupAssignorOffloadEnable() {
+ return
config.getBoolean(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+ }
+
/**
* The share group initialize retry interval.
*/
@@ -1057,4 +1283,32 @@ public class GroupCoordinatorConfig {
public int streamsGroupInitialRebalanceDelayMs() {
return streamsGroupInitialRebalanceDelayMs;
}
+
+ /**
+ * The interval between assignment updates for a streams group.
+ */
+ public int streamsGroupAssignmentIntervalMs() {
+ return
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ }
+
+ /**
+ * The minimum interval between assignment updates for a streams group.
+ */
+ public int streamsGroupMinAssignmentIntervalMs() {
+ return streamsGroupMinAssignmentIntervalMs;
+ }
+
+ /**
+ * The maximum interval between assignment updates for a streams group.
+ */
+ public int streamsGroupMaxAssignmentIntervalMs() {
+ return streamsGroupMaxAssignmentIntervalMs;
+ }
+
+ /**
+ * Whether to offload streams group assignment to a group coordinator
background thread.
+ */
+ public boolean streamsGroupAssignorOffloadEnable() {
+ return
config.getBoolean(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index ffcd909a2b9..4141e441aee 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -8697,6 +8697,22 @@ public class GroupMetadataManager {
.orElse(config.consumerGroupHeartbeatIntervalMs());
}
+ /**
+ * Get the interval between assignment updates of the provided consumer
group.
+ */
+ // package private for testing
+ int consumerGroupAssignmentIntervalMs(String groupId) {
+ return config.consumerGroupAssignmentIntervalMs();
+ }
+
+ /**
+ * Get whether to offload assignment for the provided consumer group.
+ */
+ // package private for testing
+ boolean consumerGroupAssignorOffloadEnable(String groupId) {
+ return config.consumerGroupAssignorOffloadEnable();
+ }
+
/**
* Get the session timeout of the provided share group.
*/
@@ -8715,6 +8731,22 @@ public class GroupMetadataManager {
.orElse(config.shareGroupHeartbeatIntervalMs());
}
+ /**
+ * Get the interval between assignment updates of the provided share group.
+ */
+ // package private for testing
+ int shareGroupAssignmentIntervalMs(String groupId) {
+ return config.shareGroupAssignmentIntervalMs();
+ }
+
+ /**
+ * Get whether to offload assignment for the provided share group.
+ */
+ // package private for testing
+ boolean shareGroupAssignorOffloadEnable(String groupId) {
+ return config.shareGroupAssignorOffloadEnable();
+ }
+
/**
* Get the session timeout of the provided streams group.
*/
@@ -8733,6 +8765,22 @@ public class GroupMetadataManager {
.orElse(config.streamsGroupHeartbeatIntervalMs());
}
+ /**
+ * Get the interval between assignment updates of the provided streams
group.
+ */
+ // package private for testing
+ int streamsGroupAssignmentIntervalMs(String groupId) {
+ return config.streamsGroupAssignmentIntervalMs();
+ }
+
+ /**
+ * Get whether to offload assignment for the provided streams group.
+ */
+ // package private for testing
+ boolean streamsGroupAssignorOffloadEnable(String groupId) {
+ return config.streamsGroupAssignorOffloadEnable();
+ }
+
/**
* Get the initial rebalance delay of the provided streams group.
*/
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 0208e627d6a..7319fcebdf4 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -199,8 +199,20 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
666);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
111);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
222);
+
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
500);
+
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
400);
+
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
600);
+
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
false);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
15 * 60 * 1000);
+
configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
250);
+
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
150);
+
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
350);
+
configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
false);
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
5000);
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
125);
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
25);
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
225);
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
false);
configs.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 2 *
1024 * 1024);
GroupCoordinatorConfig config = createConfig(configs);
@@ -231,8 +243,20 @@ public class GroupCoordinatorConfigTest {
assertEquals(666, config.consumerGroupMaxSessionTimeoutMs());
assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs());
assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs());
+ assertEquals(500, config.consumerGroupAssignmentIntervalMs());
+ assertEquals(400, config.consumerGroupMinAssignmentIntervalMs());
+ assertEquals(600, config.consumerGroupMaxAssignmentIntervalMs());
+ assertEquals(false, config.consumerGroupAssignorOffloadEnable());
assertEquals(15 * 60 * 1000,
config.consumerGroupRegexRefreshIntervalMs());
+ assertEquals(250, config.shareGroupAssignmentIntervalMs());
+ assertEquals(150, config.shareGroupMinAssignmentIntervalMs());
+ assertEquals(350, config.shareGroupMaxAssignmentIntervalMs());
+ assertEquals(false, config.shareGroupAssignorOffloadEnable());
assertEquals(5000, config.streamsGroupInitialRebalanceDelayMs());
+ assertEquals(125, config.streamsGroupAssignmentIntervalMs());
+ assertEquals(25, config.streamsGroupMinAssignmentIntervalMs());
+ assertEquals(225, config.streamsGroupMaxAssignmentIntervalMs());
+ assertEquals(false, config.streamsGroupAssignorOffloadEnable());
assertEquals(2 * 1024 * 1024, config.cachedBufferMaxBytes());
}
@@ -337,6 +361,45 @@ public class GroupCoordinatorConfigTest {
assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
}
+ @Test
+ public void testClampDynamicConfigs() {
+ Map<String, String> consumerProps = Map.of(
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
"30000",
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, "60000"
+ );
+ testClampDynamicConfig(consumerProps,
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "30000",
"15000");
+ testClampDynamicConfig(consumerProps,
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "45000",
"45000");
+ testClampDynamicConfig(consumerProps,
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "60000",
"90000");
+
+ Map<String, String> shareProps = Map.of(
+
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, "30000",
+
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, "60000"
+ );
+ testClampDynamicConfig(shareProps,
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "30000",
"15000");
+ testClampDynamicConfig(shareProps,
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "45000",
"45000");
+ testClampDynamicConfig(shareProps,
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "60000",
"90000");
+
+ Map<String, String> streamsProps = Map.of(
+
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, "30000",
+
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, "60000"
+ );
+ testClampDynamicConfig(streamsProps,
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "30000",
"15000");
+ testClampDynamicConfig(streamsProps,
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "45000",
"45000");
+ testClampDynamicConfig(streamsProps,
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "60000",
"90000");
+ }
+
+ private void testClampDynamicConfig(
+ Map<String, String> props,
+ String configName,
+ String expectedValue,
+ String value
+ ) {
+ props = new HashMap<>(props);
+ props.put(configName, value);
+ GroupCoordinatorConfig.clampDynamicConfigs(props);
+ assertEquals(expectedValue, props.get(configName));
+ }
+
@Test
public void testAppendLingerMs() {
GroupCoordinatorConfig config =
createConfig(Map.of(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG,
-1));