This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 5138d3df82331960e5daf0736ac3d1957e2aaeb1 Author: Matthias J. Sax <[email protected]> AuthorDate: Thu Sep 5 07:27:15 2024 -0700 Add broker configs for streams group Adds broker configs for Streams group for the MVP. --- .../consumer/internals/AsyncKafkaConsumer.java | 1 + .../coordinator/group/GroupCoordinatorConfig.java | 51 ++++++++++++++++++++++ .../coordinator/group/GroupCoordinatorShard.java | 2 +- .../metrics/GroupCoordinatorMetricsShard.java | 6 +++ .../group/GroupCoordinatorConfigTest.java | 30 ++++++++++++- .../kafka/server/config/AbstractKafkaConfig.java | 1 + 6 files changed, 89 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 6f06493f706..24092368e9b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -147,6 +147,7 @@ import static org.apache.kafka.common.utils.Utils.swallow; * This class should not be invoked directly; users should instead create a {@link KafkaConsumer} as before. * This consumer implements the new consumer group protocol and is intended to be the default in coming releases. */ +@SuppressWarnings("this-escape") public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { private static final long NO_CURRENT_THREAD = -1L; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index a6ecc2319ad..5d2165fa873 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -162,6 +162,19 @@ public class GroupCoordinatorConfig { public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000; public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members."; + /** Streams group configs */ + public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.streams.session.timeout.ms"; + public static final int STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000; + public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the streams group protocol."; + + public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.heartbeat.interval.ms"; + public static final int STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000; + public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC = "The heartbeat interval given to the members of a streams group."; + + public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG = "group.streams.num.standby.replicas"; + public static final int STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT = 0; + public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task."; + public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = "offset.metadata.max.bytes"; public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096; public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum size for a metadata entry associated with an offset commit."; @@ -246,6 +259,10 @@ public class GroupCoordinatorConfig { .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(10, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC); + public static final ConfigDef STREAMS_GROUP_CONFIG_DEF = new ConfigDef() + .define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC) + .define(STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC) + .define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC); /** * The timeout used to wait for a new member in milliseconds. @@ -284,6 +301,11 @@ public class GroupCoordinatorConfig { private final int shareGroupHeartbeatIntervalMs; private final int shareGroupMinHeartbeatIntervalMs; private final int shareGroupMaxHeartbeatIntervalMs; + // Streams group configurations + private final int streamsGroupSessionTimeoutMs; + private final int streamsGroupHeartbeatIntervalMs; + private final int streamsGroupNumStandbyReplicas; + public GroupCoordinatorConfig(AbstractConfig config) { this.numThreads = config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG); @@ -322,6 +344,10 @@ public class GroupCoordinatorConfig { this.shareGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); this.shareGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); this.shareGroupMaxSize = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG); + // Streams group configurations + this.streamsGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG); + this.streamsGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG); + this.streamsGroupNumStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG); // New group coordinator configs validation. require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs, @@ -357,6 +383,10 @@ public class GroupCoordinatorConfig { require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs, String.format("%s must be less than or equals to %s", SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); + + // Streams group configs validation. + require(streamsGroupHeartbeatIntervalMs <= streamsGroupSessionTimeoutMs, + String.format("%s must be less than or equals to %s", STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)); } /** @@ -608,4 +638,25 @@ public class GroupCoordinatorConfig { public int shareGroupMaxHeartbeatIntervalMs() { return shareGroupMaxHeartbeatIntervalMs; } + + /** + * The streams group session timeout in milliseconds. + */ + public int streamsGroupSessionTimeoutMs() { + return streamsGroupSessionTimeoutMs; + } + + /** + * The streams group heartbeat interval in milliseconds. + */ + public int streamsGroupHeartbeatIntervalMs() { + return streamsGroupHeartbeatIntervalMs; + } + + /** + * The streams group number of standby replicas + */ + public int streamsGroupNumStandbyReplicas() { + return streamsGroupNumStandbyReplicas; + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 167415f5a82..dca52fa48de 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -132,7 +132,7 @@ import static org.apache.kafka.coordinator.group.Utils.messageOrNull; * 2) The replay methods which apply records to the hard state. Those are used in the request * handling as well as during the initial loading of the records from the partitions. */ -@SuppressWarnings("ClassFanOutComplexity") +@SuppressWarnings({"ClassFanOutComplexity", "CyclomaticComplexity"}) public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord> { public static class Builder implements CoordinatorShardBuilder<GroupCoordinatorShard, CoordinatorRecord> { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java index 301eb2429f2..542a740ee07 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java @@ -585,6 +585,9 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { break; case DEAD: incrementNumStreamsGroups(StreamsGroupState.DEAD); + break; + default: + throw new IllegalArgumentException("Unknown new state for streams group: " + newState); } } @@ -607,6 +610,9 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { break; case DEAD: decrementNumStreamsGroups(StreamsGroupState.DEAD); + break; + default: + throw new IllegalArgumentException("Unknown old state for streams group: " + newState); } } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 457fa16b80f..cc4d6197171 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -43,7 +43,8 @@ public class GroupCoordinatorConfigTest { GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF, GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF, - GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF); + GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF, + GroupCoordinatorConfig.STREAMS_GROUP_CONFIG_DEF); @Test public void testConfigs() { @@ -72,6 +73,9 @@ public class GroupCoordinatorConfigTest { configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 666); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 222); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 73); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 42); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 2); GroupCoordinatorConfig config = createConfig(configs); @@ -100,6 +104,9 @@ public class GroupCoordinatorConfigTest { assertEquals(666, config.consumerGroupMaxSessionTimeoutMs()); assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs()); assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs()); + assertEquals(73, config.streamsGroupSessionTimeoutMs()); + assertEquals(42, config.streamsGroupHeartbeatIntervalMs()); + assertEquals(2, config.streamsGroupNumStandbyReplicas()); } @Test @@ -165,6 +172,27 @@ public class GroupCoordinatorConfigTest { configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, -100); assertEquals("Unknown compression type id: -100", assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 0); + assertEquals("Invalid value 0 for configuration group.streams.session.timeout.ms: Value must be at least 1", + assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 0); + assertEquals("Invalid value 0 for configuration group.streams.heartbeat.interval.ms: Value must be at least 1", + assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 42); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 43); + assertEquals("group.streams.heartbeat.interval.ms must be less than or equals to group.streams.session.timeout.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, -1); + assertEquals("Invalid value -1 for configuration group.streams.num.standby.replicas: Value must be at least 0", + assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage()); } public static GroupCoordinatorConfig createGroupCoordinatorConfig( diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index c9fcd0c13b2..d8527415da3 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -54,6 +54,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF, GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF, + GroupCoordinatorConfig.STREAMS_GROUP_CONFIG_DEF, CleanerConfig.CONFIG_DEF, LogConfig.SERVER_CONFIG_DEF, ShareGroupConfig.CONFIG_DEF,
