This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 51155f993e68e4a7a2a1c60e9d1e0cff86901b1e
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Sep 5 07:27:15 2024 -0700

    Add broker configs for streams group
    
    Adds broker configs for Streams group for the MVP.
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  1 +
 .../coordinator/group/GroupCoordinatorConfig.java  | 51 ++++++++++++++++++++++
 .../coordinator/group/GroupCoordinatorShard.java   |  2 +-
 .../metrics/GroupCoordinatorMetricsShard.java      |  6 +++
 .../group/GroupCoordinatorConfigTest.java          | 28 +++++++++++-
 .../kafka/server/config/AbstractKafkaConfig.java   |  1 +
 6 files changed, 87 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 807c900f6b9..65cea62ea64 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -159,6 +159,7 @@ import static org.apache.kafka.common.utils.Utils.swallow;
  * This class should not be invoked directly; users should instead create a 
{@link KafkaConsumer} as before.
  * This consumer implements the new consumer group protocol and is intended to 
be the default in coming releases.
  */
+@SuppressWarnings("this-escape")
 public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
 
     private static final long NO_CURRENT_THREAD = -1L;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index f94b2cc706b..cf1aa60e52f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -162,6 +162,19 @@ public class GroupCoordinatorConfig {
     public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 
15000;
     public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = 
"The maximum heartbeat interval for share group members.";
 
+    /** Streams group configs */
+    public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG = 
"group.streams.session.timeout.ms";
+    public static final int STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
+    public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC = "The 
timeout to detect client failures when using the streams group protocol.";
+
+    public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = 
"group.streams.heartbeat.interval.ms";
+    public static final int STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000;
+    public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC = "The 
heartbeat interval given to the members of a streams group.";
+
+    public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG = 
"group.streams.num.standby.replicas";
+    public static final int STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT = 0;
+    public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC = "The 
number of standby replicas for each task.";
+
     public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = 
"offset.metadata.max.bytes";
     public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096;
     public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum 
size for a metadata entry associated with an offset commit.";
@@ -248,6 +261,10 @@ public class GroupCoordinatorConfig {
             .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
             .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
             .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, 
SHARE_GROUP_MAX_SIZE_DEFAULT, between(10, 1000), MEDIUM, 
SHARE_GROUP_MAX_SIZE_DOC);
+    public static final ConfigDef STREAMS_GROUP_CONFIG_DEF =  new ConfigDef()
+            .define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, 
STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
+            .define(STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
+            .define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, 
STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC);
 
     /**
      * The timeout used to wait for a new member in milliseconds.
@@ -286,6 +303,11 @@ public class GroupCoordinatorConfig {
     private final int shareGroupHeartbeatIntervalMs;
     private final int shareGroupMinHeartbeatIntervalMs;
     private final int shareGroupMaxHeartbeatIntervalMs;
+    // Streams group configurations
+    private final int streamsGroupSessionTimeoutMs;
+    private final int streamsGroupHeartbeatIntervalMs;
+    private final int streamsGroupNumStandbyReplicas;
+
 
     public GroupCoordinatorConfig(AbstractConfig config) {
         this.numThreads = 
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
@@ -324,6 +346,10 @@ public class GroupCoordinatorConfig {
         this.shareGroupMinHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.shareGroupMaxHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.shareGroupMaxSize = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
+        // Streams group configurations
+        this.streamsGroupSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG);
+        this.streamsGroupHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.streamsGroupNumStandbyReplicas = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
 
         // New group coordinator configs validation.
         require(consumerGroupMaxHeartbeatIntervalMs >= 
consumerGroupMinHeartbeatIntervalMs,
@@ -365,6 +391,10 @@ public class GroupCoordinatorConfig {
         require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs,
             String.format("%s must be less than %s",
                 SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
+
+        // Streams group configs validation.
+        require(streamsGroupHeartbeatIntervalMs <= 
streamsGroupSessionTimeoutMs,
+                String.format("%s must be less than or equals to %s", 
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG));
     }
 
     /**
@@ -625,4 +655,25 @@ public class GroupCoordinatorConfig {
     public int shareGroupMaxHeartbeatIntervalMs() {
         return shareGroupMaxHeartbeatIntervalMs;
     }
+
+    /**
+     * The streams group session timeout in milliseconds.
+     */
+    public int streamsGroupSessionTimeoutMs() {
+        return streamsGroupSessionTimeoutMs;
+    }
+
+    /**
+     * The streams group heartbeat interval in milliseconds.
+     */
+    public int streamsGroupHeartbeatIntervalMs() {
+        return streamsGroupHeartbeatIntervalMs;
+    }
+
+    /**
+     * The streams group number of standby replicas
+     */
+    public int streamsGroupNumStandbyReplicas() {
+        return streamsGroupNumStandbyReplicas;
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 1809db63f13..6a2ad52ec3a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -135,7 +135,7 @@ import static 
org.apache.kafka.coordinator.group.Utils.messageOrNull;
  * 2) The replay methods which apply records to the hard state. Those are used 
in the request
  *    handling as well as during the initial loading of the records from the 
partitions.
  */
-@SuppressWarnings("ClassFanOutComplexity")
+@SuppressWarnings({"ClassFanOutComplexity", "CyclomaticComplexity"})
 public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord> {
 
     public static class Builder implements 
CoordinatorShardBuilder<GroupCoordinatorShard, CoordinatorRecord> {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
index 9a50c0a32cc..a2de5ea0d2c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
@@ -522,6 +522,9 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
                     break;
                 case DEAD:
                     incrementNumStreamsGroups(StreamsGroupState.DEAD);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown new state for 
streams group: " + newState);
             }
         }
 
@@ -544,6 +547,9 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
                     break;
                 case DEAD:
                     decrementNumStreamsGroups(StreamsGroupState.DEAD);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown old state for 
streams group: " + newState);
             }
         }
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 4956acaf386..baf55529fa5 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -42,7 +42,8 @@ public class GroupCoordinatorConfigTest {
             GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
             GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
             GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
-            GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF);
+            GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF,
+            GroupCoordinatorConfig.STREAMS_GROUP_CONFIG_DEF);
 
     @Test
     public void testConfigs() {
@@ -71,6 +72,9 @@ public class GroupCoordinatorConfigTest {
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
 666);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 111);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 222);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 73);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
42);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 
2);
 
         GroupCoordinatorConfig config = createConfig(configs);
 
@@ -99,6 +103,9 @@ public class GroupCoordinatorConfigTest {
         assertEquals(666, config.consumerGroupMaxSessionTimeoutMs());
         assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs());
         assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs());
+        assertEquals(73, config.streamsGroupSessionTimeoutMs());
+        assertEquals(42, config.streamsGroupHeartbeatIntervalMs());
+        assertEquals(2, config.streamsGroupNumStandbyReplicas());
     }
 
     @Test
@@ -179,7 +186,26 @@ public class GroupCoordinatorConfigTest {
         
configs.put(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
50000);
         
configs.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
50000);
         assertEquals("group.share.heartbeat.interval.ms must be less than 
group.share.session.timeout.ms",
+        
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 0);
+        assertEquals("Invalid value 0 for configuration 
group.streams.session.timeout.ms: Value must be at least 1",
+                assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
0);
+        assertEquals("Invalid value 0 for configuration 
group.streams.heartbeat.interval.ms: Value must be at least 1",
+                assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 42);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
43);
+        assertEquals("group.streams.heartbeat.interval.ms must be less than or 
equals to group.streams.session.timeout.ms",
                 assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 
-1);
+        assertEquals("Invalid value -1 for configuration 
group.streams.num.standby.replicas: Value must be at least 0",
+                assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
     }
 
     public static GroupCoordinatorConfig createGroupCoordinatorConfig(
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 537b0869d52..e7c7c18a51b 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -55,6 +55,7 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
             GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
             GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
             GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF,
+            GroupCoordinatorConfig.STREAMS_GROUP_CONFIG_DEF,
             CleanerConfig.CONFIG_DEF,
             LogConfig.SERVER_CONFIG_DEF,
             ShareGroupConfig.CONFIG_DEF,

Reply via email to