This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eae10c2e60c MINOR: Update
share.coordinator.snapshot.update.records.per.snapshot to use range [0, 500]
(#21291)
eae10c2e60c is described below
commit eae10c2e60c1fb9e9de223c3fd87c19f83915793
Author: majialong <[email protected]>
AuthorDate: Thu Jan 15 22:51:11 2026 +0800
MINOR: Update share.coordinator.snapshot.update.records.per.snapshot to use
range [0, 500] (#21291)
The actual range for
`share.coordinator.snapshot.update.records.per.snapshot` is [0, 500].
This PR updates the parameter definition to be consistent with the
actual behavior.
Reviewers: Chia-Ping Tsai <[email protected]>, Andrew Schofield
<[email protected]>, Sushant Mahajan <[email protected]>
---
.../kafka/coordinator/share/ShareCoordinatorConfig.java | 14 ++++----------
1 file changed, 4 insertions(+), 10 deletions(-)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
index 7b6bbb1dc5c..30dc340322d 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Records;
-import org.apache.kafka.common.utils.Utils;
import java.util.Optional;
import java.util.OptionalInt;
@@ -30,6 +29,7 @@ import static
org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
@@ -93,14 +93,14 @@ public class ShareCoordinatorConfig {
public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
CACHED_BUFFER_MAX_BYTES_CONFIG
);
-
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT,
STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH,
STATE_TOPIC_NUM_PARTITIONS_DOC)
.define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT,
STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH,
STATE_TOPIC_REPLICATION_FACTOR_DOC)
.define(STATE_TOPIC_MIN_ISR_CONFIG, SHORT,
STATE_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_MIN_ISR_DOC)
.define(STATE_TOPIC_SEGMENT_BYTES_CONFIG, INT,
STATE_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH,
STATE_TOPIC_SEGMENT_BYTES_DOC)
.define(NUM_THREADS_CONFIG, INT, NUM_THREADS_DEFAULT, atLeast(1),
MEDIUM, NUM_THREADS_DOC)
- .define(SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG, INT,
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_DEFAULT, atLeast(0), MEDIUM,
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_DOC)
+ .define(SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG, INT,
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_DEFAULT, between(0, 500), MEDIUM,
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_DOC)
.define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT,
atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC)
.define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int)
STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH,
STATE_TOPIC_COMPRESSION_CODEC_DOC)
.define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT,
atLeast(-1), MEDIUM, APPEND_LINGER_MS_DOC)
@@ -142,7 +142,6 @@ public class ShareCoordinatorConfig {
pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
coldPartitionSnapshotIntervalMs =
config.getInt(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG);
this.config = config;
- validate();
}
public int shareCoordinatorStateTopicNumPartitions() {
@@ -200,7 +199,7 @@ public class ShareCoordinatorConfig {
public int shareCoordinatorColdPartitionSnapshotIntervalMs() {
return coldPartitionSnapshotIntervalMs;
}
-
+
/**
* The maximum buffer size that the share coordinator can cache.
*
@@ -209,9 +208,4 @@ public class ShareCoordinatorConfig {
public int shareCoordinatorCachedBufferMaxBytes() {
return config.getInt(CACHED_BUFFER_MAX_BYTES_CONFIG);
}
-
- private void validate() {
- Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 &&
snapshotUpdateRecordsPerSnapshot <= 500,
- String.format("%s must be between [0, 500]",
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG));
- }
}