This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new db62c7cdffa KAFKA-19157: added group.share.max.share.sessions config 
(#19503)
db62c7cdffa is described below

commit db62c7cdffa19c9e98aaf5b8dd01431f25cf414e
Author: Chirag Wadhwa <122860692+chirag-wadh...@users.noreply.github.com>
AuthorDate: Thu Apr 17 17:47:58 2025 +0530

    KAFKA-19157: added group.share.max.share.sessions config (#19503)
    
    This PR adds the config group.share.max.share.sessions to
    ShareGroupConfig
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../test/scala/unit/kafka/server/KafkaConfigTest.scala  |  1 +
 .../group/modern/share/ShareGroupConfig.java            | 17 +++++++++++++++--
 .../group/modern/share/ShareGroupConfigTest.java        |  2 ++
 3 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index f6819ac8080..cb8a1b1f869 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1030,6 +1030,7 @@ class KafkaConfigTest {
         case ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case 
ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG =>  
//ignore string
 
         /** Streams groups configs */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index ce2a8744ddb..58df34c2679 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
@@ -70,6 +71,10 @@ public class ShareGroupConfig {
     public static final int 
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT = 1000;
     public static final String 
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC = "The purge interval (in 
number of requests) of the share fetch request purgatory";
 
+    public static final String SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG = 
"group.share.max.share.sessions";
+    public static final int SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT = 2000;
+    public static final String SHARE_GROUP_MAX_SHARE_SESSIONS_DOC = "The 
maximum number of share sessions per broker.";
+
     public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG = 
"group.share.persister.class.name";
     public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT = 
"org.apache.kafka.server.share.persister.DefaultStatePersister";
     public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DOC = "The 
class name of share persister for share group. The class should implement " +
@@ -84,6 +89,7 @@ public class ShareGroupConfig {
             .define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, 
SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, 
SHARE_GROUP_MAX_GROUPS_DOC)
             .define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, 
SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, 
SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC)
             .define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, 
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, 
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
+            .define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT, 
SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM, 
SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
             .defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, 
SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, 
SHARE_GROUP_PERSISTER_CLASS_NAME_DOC);
 
     private final boolean isShareGroupEnabled;
@@ -94,11 +100,13 @@ public class ShareGroupConfig {
     private final int shareGroupMaxRecordLockDurationMs;
     private final int shareGroupMinRecordLockDurationMs;
     private final int shareFetchPurgatoryPurgeIntervalRequests;
+    private final int shareGroupMaxShareSessions;
     private final String shareGroupPersisterClassName;
 
     public ShareGroupConfig(AbstractConfig config) {
-        // Share groups are enabled in two cases: 1) The internal 
configuration to enable it is
-        // explicitly set; or 2) the share rebalance protocol is enabled.
+        // Share groups are enabled in two cases:
+        // 1. The internal configuration to enable it is explicitly set
+        // 2. the share rebalance protocol is enabled.
         Set<String> protocols = 
config.getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
             .stream().map(String::toUpperCase).collect(Collectors.toSet());
         isShareGroupEnabled = 
config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG) ||
@@ -110,6 +118,7 @@ public class ShareGroupConfig {
         shareGroupMaxRecordLockDurationMs = 
config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
         shareGroupMinRecordLockDurationMs = 
config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
         shareFetchPurgatoryPurgeIntervalRequests = 
config.getInt(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG);
+        shareGroupMaxShareSessions = 
config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG);
         shareGroupPersisterClassName = 
config.getString(ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG);
         validate();
     }
@@ -147,6 +156,10 @@ public class ShareGroupConfig {
         return shareFetchPurgatoryPurgeIntervalRequests;
     }
 
+    public int shareGroupMaxShareSessions() {
+        return shareGroupMaxShareSessions;
+    }
+
     public String shareGroupPersisterClassName() {
         return shareGroupPersisterClassName;
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index 36b48d307a8..0a2f57b6ff6 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -44,6 +44,7 @@ public class ShareGroupConfigTest {
         
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, 
15000);
         
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, 
60000);
         
configs.put(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG,
 1000);
+        configs.put(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, 
1000);
 
         ShareGroupConfig config = createConfig(configs);
 
@@ -55,6 +56,7 @@ public class ShareGroupConfigTest {
         assertEquals(15000, config.shareGroupMinRecordLockDurationMs());
         assertEquals(60000, config.shareGroupMaxRecordLockDurationMs());
         assertEquals(1000, config.shareFetchPurgatoryPurgeIntervalRequests());
+        assertEquals(1000, config.shareGroupMaxShareSessions());
     }
 
     @Test

Reply via email to