This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new db62c7cdffa KAFKA-19157: added group.share.max.share.sessions config (#19503) db62c7cdffa is described below commit db62c7cdffa19c9e98aaf5b8dd01431f25cf414e Author: Chirag Wadhwa <122860692+chirag-wadh...@users.noreply.github.com> AuthorDate: Thu Apr 17 17:47:58 2025 +0530 KAFKA-19157: added group.share.max.share.sessions config (#19503) This PR adds the config group.share.max.share.sessions to ShareGroupConfig Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../test/scala/unit/kafka/server/KafkaConfigTest.scala | 1 + .../group/modern/share/ShareGroupConfig.java | 17 +++++++++++++++-- .../group/modern/share/ShareGroupConfigTest.java | 2 ++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index f6819ac8080..cb8a1b1f869 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1030,6 +1030,7 @@ class KafkaConfigTest { case ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") + case ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG => //ignore string /** Streams groups configs */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java index ce2a8744ddb..58df34c2679 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.stream.Collectors; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; import static org.apache.kafka.common.config.ConfigDef.Type.INT; @@ -70,6 +71,10 @@ public class ShareGroupConfig { public static final int SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT = 1000; public static final String SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC = "The purge interval (in number of requests) of the share fetch request purgatory"; + public static final String SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG = "group.share.max.share.sessions"; + public static final int SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT = 2000; + public static final String SHARE_GROUP_MAX_SHARE_SESSIONS_DOC = "The maximum number of share sessions per broker."; + public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG = "group.share.persister.class.name"; public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT = "org.apache.kafka.server.share.persister.DefaultStatePersister"; public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DOC = "The class name of share persister for share group. The class should implement " + @@ -84,6 +89,7 @@ public class ShareGroupConfig { .define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC) .define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC) .define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC) + .define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT, SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SHARE_SESSIONS_DOC) .defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, SHARE_GROUP_PERSISTER_CLASS_NAME_DOC); private final boolean isShareGroupEnabled; @@ -94,11 +100,13 @@ public class ShareGroupConfig { private final int shareGroupMaxRecordLockDurationMs; private final int shareGroupMinRecordLockDurationMs; private final int shareFetchPurgatoryPurgeIntervalRequests; + private final int shareGroupMaxShareSessions; private final String shareGroupPersisterClassName; public ShareGroupConfig(AbstractConfig config) { - // Share groups are enabled in two cases: 1) The internal configuration to enable it is - // explicitly set; or 2) the share rebalance protocol is enabled. + // Share groups are enabled in two cases: + // 1. The internal configuration to enable it is explicitly set + // 2. the share rebalance protocol is enabled. Set<String> protocols = config.getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG) .stream().map(String::toUpperCase).collect(Collectors.toSet()); isShareGroupEnabled = config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG) || @@ -110,6 +118,7 @@ public class ShareGroupConfig { shareGroupMaxRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG); shareGroupMinRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG); shareFetchPurgatoryPurgeIntervalRequests = config.getInt(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG); + shareGroupMaxShareSessions = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG); shareGroupPersisterClassName = config.getString(ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG); validate(); } @@ -147,6 +156,10 @@ public class ShareGroupConfig { return shareFetchPurgatoryPurgeIntervalRequests; } + public int shareGroupMaxShareSessions() { + return shareGroupMaxShareSessions; + } + public String shareGroupPersisterClassName() { return shareGroupPersisterClassName; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java index 36b48d307a8..0a2f57b6ff6 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java @@ -44,6 +44,7 @@ public class ShareGroupConfigTest { configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, 15000); configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, 60000); configs.put(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, 1000); + configs.put(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, 1000); ShareGroupConfig config = createConfig(configs); @@ -55,6 +56,7 @@ public class ShareGroupConfigTest { assertEquals(15000, config.shareGroupMinRecordLockDurationMs()); assertEquals(60000, config.shareGroupMaxRecordLockDurationMs()); assertEquals(1000, config.shareFetchPurgatoryPurgeIntervalRequests()); + assertEquals(1000, config.shareGroupMaxShareSessions()); } @Test