This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 51155f993e68e4a7a2a1c60e9d1e0cff86901b1e Author: Matthias J. Sax <[email protected]> AuthorDate: Thu Sep 5 07:27:15 2024 -0700 Add broker configs for streams group Adds broker configs for Streams group for the MVP. --- .../consumer/internals/AsyncKafkaConsumer.java | 1 + .../coordinator/group/GroupCoordinatorConfig.java | 51 ++++++++++++++++++++++ .../coordinator/group/GroupCoordinatorShard.java | 2 +- .../metrics/GroupCoordinatorMetricsShard.java | 6 +++ .../group/GroupCoordinatorConfigTest.java | 28 +++++++++++- .../kafka/server/config/AbstractKafkaConfig.java | 1 + 6 files changed, 87 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 807c900f6b9..65cea62ea64 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -159,6 +159,7 @@ import static org.apache.kafka.common.utils.Utils.swallow; * This class should not be invoked directly; users should instead create a {@link KafkaConsumer} as before. * This consumer implements the new consumer group protocol and is intended to be the default in coming releases. */ +@SuppressWarnings("this-escape") public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { private static final long NO_CURRENT_THREAD = -1L; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index f94b2cc706b..cf1aa60e52f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -162,6 +162,19 @@ public class GroupCoordinatorConfig { public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000; public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members."; + /** Streams group configs */ + public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.streams.session.timeout.ms"; + public static final int STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000; + public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the streams group protocol."; + + public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.heartbeat.interval.ms"; + public static final int STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000; + public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC = "The heartbeat interval given to the members of a streams group."; + + public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG = "group.streams.num.standby.replicas"; + public static final int STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT = 0; + public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task."; + public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = "offset.metadata.max.bytes"; public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096; public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum size for a metadata entry associated with an offset commit."; @@ -248,6 +261,10 @@ public class GroupCoordinatorConfig { .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(10, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC); + public static final ConfigDef STREAMS_GROUP_CONFIG_DEF = new ConfigDef() + .define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC) + .define(STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC) + .define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC); /** * The timeout used to wait for a new member in milliseconds. @@ -286,6 +303,11 @@ public class GroupCoordinatorConfig { private final int shareGroupHeartbeatIntervalMs; private final int shareGroupMinHeartbeatIntervalMs; private final int shareGroupMaxHeartbeatIntervalMs; + // Streams group configurations + private final int streamsGroupSessionTimeoutMs; + private final int streamsGroupHeartbeatIntervalMs; + private final int streamsGroupNumStandbyReplicas; + public GroupCoordinatorConfig(AbstractConfig config) { this.numThreads = config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG); @@ -324,6 +346,10 @@ public class GroupCoordinatorConfig { this.shareGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); this.shareGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); this.shareGroupMaxSize = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG); + // Streams group configurations + this.streamsGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG); + this.streamsGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); + this.streamsGroupNumStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG); // New group coordinator configs validation. require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs, @@ -365,6 +391,10 @@ public class GroupCoordinatorConfig { require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs, String.format("%s must be less than %s", SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)); + + // Streams group configs validation. + require(streamsGroupHeartbeatIntervalMs <= streamsGroupSessionTimeoutMs, + String.format("%s must be less than or equals to %s", STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)); } /** @@ -625,4 +655,25 @@ public class GroupCoordinatorConfig { public int shareGroupMaxHeartbeatIntervalMs() { return shareGroupMaxHeartbeatIntervalMs; } + + /** + * The streams group session timeout in milliseconds. + */ + public int streamsGroupSessionTimeoutMs() { + return streamsGroupSessionTimeoutMs; + } + + /** + * The streams group heartbeat interval in milliseconds. + */ + public int streamsGroupHeartbeatIntervalMs() { + return streamsGroupHeartbeatIntervalMs; + } + + /** + * The streams group number of standby replicas + */ + public int streamsGroupNumStandbyReplicas() { + return streamsGroupNumStandbyReplicas; + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 1809db63f13..6a2ad52ec3a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -135,7 +135,7 @@ import static org.apache.kafka.coordinator.group.Utils.messageOrNull; * 2) The replay methods which apply records to the hard state. Those are used in the request * handling as well as during the initial loading of the records from the partitions. */ -@SuppressWarnings("ClassFanOutComplexity") +@SuppressWarnings({"ClassFanOutComplexity", "CyclomaticComplexity"}) public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord> { public static class Builder implements CoordinatorShardBuilder<GroupCoordinatorShard, CoordinatorRecord> { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java index 9a50c0a32cc..a2de5ea0d2c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java @@ -522,6 +522,9 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { break; case DEAD: incrementNumStreamsGroups(StreamsGroupState.DEAD); + break; + default: + throw new IllegalArgumentException("Unknown new state for streams group: " + newState); } } @@ -544,6 +547,9 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { break; case DEAD: decrementNumStreamsGroups(StreamsGroupState.DEAD); + break; + default: + throw new IllegalArgumentException("Unknown old state for streams group: " + newState); } } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 4956acaf386..baf55529fa5 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -42,7 +42,8 @@ public class GroupCoordinatorConfigTest { GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF, GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF, - GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF); + GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF, + GroupCoordinatorConfig.STREAMS_GROUP_CONFIG_DEF); @Test public void testConfigs() { @@ -71,6 +72,9 @@ public class GroupCoordinatorConfigTest { configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 666); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 222); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 73); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 42); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 2); GroupCoordinatorConfig config = createConfig(configs); @@ -99,6 +103,9 @@ public class GroupCoordinatorConfigTest { assertEquals(666, config.consumerGroupMaxSessionTimeoutMs()); assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs()); assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs()); + assertEquals(73, config.streamsGroupSessionTimeoutMs()); + assertEquals(42, config.streamsGroupHeartbeatIntervalMs()); + assertEquals(2, config.streamsGroupNumStandbyReplicas()); } @Test @@ -179,7 +186,26 @@ public class GroupCoordinatorConfigTest { configs.put(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 50000); configs.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000); assertEquals("group.share.heartbeat.interval.ms must be less than group.share.session.timeout.ms", + + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 0); + assertEquals("Invalid value 0 for configuration group.streams.session.timeout.ms: Value must be at least 1", + assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 0); + assertEquals("Invalid value 0 for configuration group.streams.heartbeat.interval.ms: Value must be at least 1", + assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 42); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 43); + assertEquals("group.streams.heartbeat.interval.ms must be less than or equals to group.streams.session.timeout.ms", assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, -1); + assertEquals("Invalid value -1 for configuration group.streams.num.standby.replicas: Value must be at least 0", + assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage()); } public static GroupCoordinatorConfig createGroupCoordinatorConfig( diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index 537b0869d52..e7c7c18a51b 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -55,6 +55,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF, GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF, + GroupCoordinatorConfig.STREAMS_GROUP_CONFIG_DEF, CleanerConfig.CONFIG_DEF, LogConfig.SERVER_CONFIG_DEF, ShareGroupConfig.CONFIG_DEF,
