This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new ad89f9edc0d MINOR: Rearrange configs in GroupCoordinatorConfigs
(#18970)
ad89f9edc0d is described below
commit ad89f9edc0d19b78fc8f57397826a6b4e298d4ab
Author: David Jacot <[email protected]>
AuthorDate: Fri Feb 21 13:20:58 2025 +0100
MINOR: Rearrange configs in GroupCoordinatorConfigs (#18970)
I was looking into GroupCoordinatorConfigs to review configurations that
we will ship with Apache Kafka 4.0. I found out that it was pretty
disorganised. This patch cleans up the format and re-groups the
configurations which are related.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../coordinator/group/GroupCoordinatorConfig.java | 238 +++++++++++----------
.../group/GroupCoordinatorConfigTest.java | 2 +-
.../group/GroupMetadataManagerTestContext.java | 2 +-
.../group/modern/share/ShareGroupConfigTest.java | 2 +-
.../kafka/server/config/AbstractKafkaConfig.java | 2 +-
5 files changed, 132 insertions(+), 114 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 4e335c3bb81..fb7c5444230 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -56,24 +56,9 @@ import static org.apache.kafka.common.utils.Utils.require;
* Using local variable is advantageous as it avoids the overhead of
repeatedly looking up these configurations in AbstractConfig.
*/
public class GroupCoordinatorConfig {
- /** ********* Group coordinator configuration ***********/
- public static final String GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG =
"group.min.session.timeout.ms";
- public static final String GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum
allowed session timeout for registered consumers. Shorter timeouts result in
quicker failure detection at the cost of more frequent consumer heartbeating,
which can overwhelm broker resources.";
- public static final int GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 6000;
-
- public static final String GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG =
"group.max.session.timeout.ms";
- public static final String GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum
allowed session timeout for registered consumers. Longer timeouts give
consumers more time to process messages in between heartbeats at the cost of a
longer time to detect failures.";
- public static final int GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 1800000;
-
- public static final String GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG =
"group.initial.rebalance.delay.ms";
- public static final String GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The
amount of time the group coordinator will wait for more consumers to join a new
group before performing the first rebalance. A longer delay means potentially
fewer rebalances, but increases the time until processing begins.";
- public static final int GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000;
-
- public static final String GROUP_MAX_SIZE_CONFIG = "group.max.size";
- public static final String GROUP_MAX_SIZE_DOC = "The maximum number of
consumers that a single consumer group can accommodate.";
- public static final int GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
-
- /** New group coordinator configs */
+ ///
+ /// Group coordinator configs
+ ///
public static final String NEW_GROUP_COORDINATOR_ENABLE_CONFIG =
"group.coordinator.new.enable";
public static final String NEW_GROUP_COORDINATOR_ENABLE_DOC = "Enable the
new group coordinator.";
public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = true;
@@ -94,7 +79,79 @@ public class GroupCoordinatorConfig {
public static final String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number
of threads used by the group coordinator.";
public static final int GROUP_COORDINATOR_NUM_THREADS_DEFAULT = 4;
- /** Consumer group configs */
+ public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG =
"offsets.load.buffer.size";
+ public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024;
+ public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for
reading from the offsets segments when loading group metadata " +
+ " into the cache (soft-limit, overridden if records are too large).";
+
+ public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG =
"offsets.commit.timeout.ms";
+ public static final int OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000;
+ public static final String OFFSET_COMMIT_TIMEOUT_MS_DOC = "Offset commit
will be delayed until all replicas for the offsets topic receive the commit " +
+ "or this timeout is reached. This is similar to the producer request
timeout. This is applied to all the writes made by the coordinator.";
+
+ public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG =
"offsets.topic.num.partitions";
+ public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50;
+ public static final String OFFSETS_TOPIC_PARTITIONS_DOC = "The number of
partitions for the offset commit topic (should not change after deployment).";
+
+ public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG =
"offsets.topic.segment.bytes";
+ public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 *
1024;
+ public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets
topic segment bytes should be kept relatively small in order to facilitate " +
+ "faster log compaction and cache loads.";
+
+ public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG =
"offsets.topic.replication.factor";
+ public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
+ public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The
replication factor for the offsets topic (set higher to ensure availability). "
+
+ "Internal topic creation will fail until the cluster size meets this
replication factor requirement.";
+
+ public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG =
"offsets.topic.compression.codec";
+ public static final CompressionType
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE;
+ public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC =
"Compression codec for the offsets topic - compression may be used to achieve
\"atomic\" commits.";
+
+ ///
+ /// Offset configs
+ ///
+ public static final String OFFSET_METADATA_MAX_SIZE_CONFIG =
"offset.metadata.max.bytes";
+ public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096;
+ public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum
size for a metadata entry associated with an offset commit.";
+
+ public static final String OFFSETS_RETENTION_MINUTES_CONFIG =
"offsets.retention.minutes";
+ public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60;
+ public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed
consumers, committed offset of a specific partition will be expired and
discarded when " +
+ "1) this retention period has elapsed after the consumer group loses
all its consumers (i.e. becomes empty); " +
+ "2) this retention period has elapsed since the last time an offset is
committed for the partition and the group is no longer subscribed to the
corresponding topic. " +
+ "For standalone consumers (using manual assignment), offsets will be
expired after this retention period has elapsed since the time of last commit.
" +
+ "Note that when a group is deleted via the delete-group request, its
committed offsets will also be deleted without extra retention period; " +
+ "also when a topic is deleted via the delete-topic request, upon
propagated metadata update any group's committed offsets for that topic will
also be deleted without extra retention period.";
+
+ public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG =
"offsets.retention.check.interval.ms";
+ public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT =
600000L;
+ public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC =
"Frequency at which to check for stale offsets";
+
+ ///
+ /// Classic group configs
+ ///
+ public static final String GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG =
"group.min.session.timeout.ms";
+ public static final String GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum
allowed session timeout for registered consumers. Shorter timeouts result in " +
+ "quicker failure detection at the cost of more frequent consumer
heartbeating, which can overwhelm broker resources.";
+ public static final int GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 6000;
+
+ public static final String GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG =
"group.max.session.timeout.ms";
+ public static final String GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum
allowed session timeout for registered consumers. Longer timeouts give
consumers " +
+ "more time to process messages in between heartbeats at the cost of a
longer time to detect failures.";
+ public static final int GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 1800000;
+
+ public static final String GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG =
"group.initial.rebalance.delay.ms";
+ public static final String GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The
amount of time the group coordinator will wait for more consumers to join a new
group " +
+ "before performing the first rebalance. A longer delay means
potentially fewer rebalances, but increases the time until processing begins.";
+ public static final int GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000;
+
+ public static final String GROUP_MAX_SIZE_CONFIG = "group.max.size";
+ public static final String GROUP_MAX_SIZE_DOC = "The maximum number of
consumers that a single consumer group can accommodate.";
+ public static final int GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
+
+ ///
+ /// Consumer group configs
+ ///
public static final String CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG =
"group.consumer.session.timeout.ms";
public static final String CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC = "The
timeout to detect client failures when using the consumer group protocol.";
public static final int CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
@@ -121,9 +178,9 @@ public class GroupCoordinatorConfig {
public static final String CONSUMER_GROUP_MAX_SIZE_CONFIG =
"group.consumer.max.size";
public static final String CONSUMER_GROUP_MAX_SIZE_DOC = "The maximum
number of consumers " +
- "that a single consumer group can accommodate. This value will
only impact groups under " +
- "the CONSUMER group protocol. To configure the max group size when
using the CLASSIC " +
- "group protocol use " + GROUP_MAX_SIZE_CONFIG + " " + "instead.";
+ "that a single consumer group can accommodate. This value will only
impact groups under " +
+ "the CONSUMER group protocol. To configure the max group size when
using the CLASSIC " +
+ "group protocol use " + GROUP_MAX_SIZE_CONFIG + " " + "instead.";
public static final int CONSUMER_GROUP_MAX_SIZE_DEFAULT =
Integer.MAX_VALUE;
@@ -142,14 +199,17 @@ public class GroupCoordinatorConfig {
public static final String CONSUMER_GROUP_MIGRATION_POLICY_CONFIG =
"group.consumer.migration.policy";
public static final String CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT =
ConsumerGroupMigrationPolicy.BIDIRECTIONAL.toString();
- public static final String CONSUMER_GROUP_MIGRATION_POLICY_DOC = "The
config that enables converting the non-empty classic group using the consumer
embedded protocol to the non-empty consumer group using the consumer group
protocol and vice versa; " +
- "conversions of empty groups in both directions are always enabled
regardless of this policy. " +
- ConsumerGroupMigrationPolicy.BIDIRECTIONAL + ": both upgrade from
classic group to consumer group and downgrade from consumer group to classic
group are enabled, " +
- ConsumerGroupMigrationPolicy.UPGRADE + ": only upgrade from
classic group to consumer group is enabled, " +
- ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from
consumer group to classic group is enabled, " +
- ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor
downgrade is enabled.";
-
- /** Share group configs */
+ public static final String CONSUMER_GROUP_MIGRATION_POLICY_DOC = "The
config that enables converting the non-empty classic group using the consumer
embedded protocol " +
+ "to the non-empty consumer group using the consumer group protocol and
vice versa; " +
+ "conversions of empty groups in both directions are always enabled
regardless of this policy. " +
+ ConsumerGroupMigrationPolicy.BIDIRECTIONAL + ": both upgrade from
classic group to consumer group and downgrade from consumer group to classic
group are enabled, " +
+ ConsumerGroupMigrationPolicy.UPGRADE + ": only upgrade from classic
group to consumer group is enabled, " +
+ ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from
consumer group to classic group is enabled, " +
+ ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor
downgrade is enabled.";
+
+ ///
+ /// Share group configs
+ ///
public static final String SHARE_GROUP_MAX_SIZE_CONFIG =
"group.share.max.size";
public static final int SHARE_GROUP_MAX_SIZE_DEFAULT = 200;
public static final String SHARE_GROUP_MAX_SIZE_DOC = "The maximum number
of members that a single share group can accommodate.";
@@ -178,92 +238,50 @@ public class GroupCoordinatorConfig {
public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT =
15000;
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC =
"The maximum heartbeat interval for share group members.";
- public static final String OFFSET_METADATA_MAX_SIZE_CONFIG =
"offset.metadata.max.bytes";
- public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096;
- public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum
size for a metadata entry associated with an offset commit.";
-
- public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG =
"offsets.load.buffer.size";
- public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024;
- public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for
reading from the offsets segments when loading offsets into the cache
(soft-limit, overridden if records are too large).";
-
- public static final String OFFSETS_RETENTION_MINUTES_CONFIG =
"offsets.retention.minutes";
- public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60;
- public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed
consumers, committed offset of a specific partition will be expired and
discarded when 1) this retention period has elapsed after the consumer group
loses all its consumers (i.e. becomes empty); " +
- "2) this retention period has elapsed since the last time an
offset is committed for the partition and the group is no longer subscribed to
the corresponding topic. " +
- "For standalone consumers (using manual assignment), offsets will
be expired after this retention period has elapsed since the time of last
commit. " +
- "Note that when a group is deleted via the delete-group request,
its committed offsets will also be deleted without extra retention period; " +
- "also when a topic is deleted via the delete-topic request, upon
propagated metadata update any group's committed offsets for that topic will
also be deleted without extra retention period.";
-
- public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG =
"offsets.retention.check.interval.ms";
- public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT =
600000L;
- public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC =
"Frequency at which to check for stale offsets";
-
- public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG =
"offsets.topic.num.partitions";
- public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50;
- public static final String OFFSETS_TOPIC_PARTITIONS_DOC = "The number of
partitions for the offset commit topic (should not change after deployment).";
-
- public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG =
"offsets.topic.segment.bytes";
- public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 *
1024;
- public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets
topic segment bytes should be kept relatively small in order to facilitate
faster log compaction and cache loads.";
-
- public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG =
"offsets.topic.replication.factor";
- public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
- public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The
replication factor for the offsets topic (set higher to ensure availability). "
+
- "Internal topic creation will fail until the cluster size meets
this replication factor requirement.";
-
- public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG =
"offsets.topic.compression.codec";
- public static final CompressionType
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE;
- public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC =
"Compression codec for the offsets topic - compression may be used to achieve
\"atomic\" commits.";
-
- public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG =
"offsets.commit.timeout.ms";
- public static final int OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000;
- public static final String OFFSET_COMMIT_TIMEOUT_MS_DOC = "Offset commit
will be delayed until all replicas for the offsets topic receive the commit " +
- "or this timeout is reached. This is similar to the producer
request timeout.";
-
public static final ConfigDef GROUP_COORDINATOR_CONFIG_DEF = new
ConfigDef()
- .define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT,
GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
- .define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT,
GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
- .define(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT,
GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM,
GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
- .define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT,
atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC);
-
- public static final ConfigDef NEW_GROUP_CONFIG_DEF = new ConfigDef()
- .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST,
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
-
ConfigDef.ValidList.in(Group.GroupType.documentValidValues()), MEDIUM,
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
- .define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT,
GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH,
GROUP_COORDINATOR_NUM_THREADS_DOC)
- .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT,
GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM,
GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
- // Internal configuration used by integration and system tests.
- .defineInternal(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN,
NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM,
NEW_GROUP_COORDINATOR_ENABLE_DOC);
+ .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST,
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
+ ConfigDef.ValidList.in(Group.GroupType.documentValidValues()),
MEDIUM, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
+ .define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT,
GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH,
GROUP_COORDINATOR_NUM_THREADS_DOC)
+ .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT,
GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM,
GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
+ .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT,
OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH,
OFFSET_COMMIT_TIMEOUT_MS_DOC)
+ .define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT,
OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH,
OFFSETS_LOAD_BUFFER_SIZE_DOC)
+ .define(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT,
OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH,
OFFSETS_TOPIC_REPLICATION_FACTOR_DOC)
+ .define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT,
OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH,
OFFSETS_TOPIC_PARTITIONS_DOC)
+ .define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT,
OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH,
OFFSETS_TOPIC_SEGMENT_BYTES_DOC)
+ .define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int)
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH,
OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
+ // Internal configuration used by integration and system tests.
+ .defineInternal(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN,
NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM,
NEW_GROUP_COORDINATOR_ENABLE_DOC);
public static final ConfigDef OFFSET_MANAGEMENT_CONFIG_DEF = new
ConfigDef()
- .define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT,
OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC)
- .define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT,
OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH,
OFFSETS_LOAD_BUFFER_SIZE_DOC)
- .define(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT,
OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH,
OFFSETS_TOPIC_REPLICATION_FACTOR_DOC)
- .define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT,
OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH,
OFFSETS_TOPIC_PARTITIONS_DOC)
- .define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT,
OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH,
OFFSETS_TOPIC_SEGMENT_BYTES_DOC)
- .define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int)
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH,
OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
- .define(OFFSETS_RETENTION_MINUTES_CONFIG, INT,
OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH,
OFFSETS_RETENTION_MINUTES_DOC)
- .define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG,
OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH,
OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC)
- .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT,
OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH,
OFFSET_COMMIT_TIMEOUT_MS_DOC);
+ .define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT,
OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC)
+ .define(OFFSETS_RETENTION_MINUTES_CONFIG, INT,
OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH,
OFFSETS_RETENTION_MINUTES_DOC)
+ .define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG,
OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH,
OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC);
+
+ public static final ConfigDef CLASSIC_GROUP_CONFIG_DEF = new ConfigDef()
+ .define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT,
GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
+ .define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT,
GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
+ .define(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT,
GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM,
GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
+ .define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT,
atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC);
public static final ConfigDef CONSUMER_GROUP_CONFIG_DEF = new ConfigDef()
- .define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT,
CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
- .define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT,
CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
- .define(CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT,
CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
- .define(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
- .define(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
- .define(CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
- .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT,
CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MAX_SIZE_DOC)
- .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST,
CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
- .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING,
CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)),
MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC);
+ .define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT,
CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+ .define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT,
CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
+ .define(CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT,
CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
+ .define(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT,
CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MAX_SIZE_DOC)
+ .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST,
CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
+ .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING,
CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)),
MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC);
public static final ConfigDef SHARE_GROUP_CONFIG_DEF = new ConfigDef()
- .define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT,
SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
- .define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT,
SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
- .define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT,
SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
- .define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
- .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
- .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
- .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT,
SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM,
SHARE_GROUP_MAX_SIZE_DOC);
+ .define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT,
SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
+ .define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT,
SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
+ .define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT,
SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
+ .define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT,
SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM,
SHARE_GROUP_MAX_SIZE_DOC);
/**
* The timeout used to wait for a new member in milliseconds.
@@ -390,8 +408,8 @@ public class GroupCoordinatorConfig {
new AbstractConfig(
Utils.mergeConfigs(List.of(
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
- GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
+ GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF
)),
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 03887a017ab..66f23a7c01d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -46,8 +46,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class GroupCoordinatorConfigTest {
private static final List<ConfigDef> GROUP_COORDINATOR_CONFIG_DEFS =
List.of(
+ GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
- GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 84e203ccfb3..42cdd9a5061 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -149,8 +149,8 @@ public class GroupMetadataManagerTestContext {
return new GroupCoordinatorConfigContext(
new AbstractConfig(
Utils.mergeConfigs(List.of(
+ GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
- GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index aee1610a77f..94aab9eded2 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -145,6 +145,6 @@ public class ShareGroupConfigTest {
private static ShareGroupConfig createConfig(Map<String, Object> configs) {
return new ShareGroupConfig(
- new
AbstractConfig(Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF,
GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF)), configs, false));
+ new
AbstractConfig(Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF,
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF)), configs, false));
}
}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 918534fce7a..df38866d4f5 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -49,8 +49,8 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
KRaftConfigs.CONFIG_DEF,
SocketServerConfigs.CONFIG_DEF,
ReplicationConfigs.CONFIG_DEF,
+ GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
- GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF,