This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eae10c2e60c MINOR: Update 
share.coordinator.snapshot.update.records.per.snapshot to use range [0, 500] 
(#21291)
eae10c2e60c is described below

commit eae10c2e60c1fb9e9de223c3fd87c19f83915793
Author: majialong <[email protected]>
AuthorDate: Thu Jan 15 22:51:11 2026 +0800

    MINOR: Update share.coordinator.snapshot.update.records.per.snapshot to use 
range [0, 500] (#21291)
    
    The actual range for
    `share.coordinator.snapshot.update.records.per.snapshot` is [0, 500].
    This PR updates the parameter definition to be consistent with the
    actual behavior.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Andrew Schofield
     <[email protected]>, Sushant Mahajan <[email protected]>
---
 .../kafka/coordinator/share/ShareCoordinatorConfig.java    | 14 ++++----------
 1 file changed, 4 insertions(+), 10 deletions(-)

diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
index 7b6bbb1dc5c..30dc340322d 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Records;
-import org.apache.kafka.common.utils.Utils;
 
 import java.util.Optional;
 import java.util.OptionalInt;
@@ -30,6 +29,7 @@ import static 
org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
 import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
 import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
 
@@ -93,14 +93,14 @@ public class ShareCoordinatorConfig {
     public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
         CACHED_BUFFER_MAX_BYTES_CONFIG
     );
-    
+
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT, 
STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH, 
STATE_TOPIC_NUM_PARTITIONS_DOC)
         .define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, 
STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, 
STATE_TOPIC_REPLICATION_FACTOR_DOC)
         .define(STATE_TOPIC_MIN_ISR_CONFIG, SHORT, 
STATE_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, STATE_TOPIC_MIN_ISR_DOC)
         .define(STATE_TOPIC_SEGMENT_BYTES_CONFIG, INT, 
STATE_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, 
STATE_TOPIC_SEGMENT_BYTES_DOC)
         .define(NUM_THREADS_CONFIG, INT, NUM_THREADS_DEFAULT, atLeast(1), 
MEDIUM, NUM_THREADS_DOC)
-        .define(SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG, INT, 
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_DEFAULT, atLeast(0), MEDIUM, 
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_DOC)
+        .define(SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG, INT, 
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_DEFAULT, between(0, 500), MEDIUM, 
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_DOC)
         .define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, 
atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC)
         .define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) 
STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, 
STATE_TOPIC_COMPRESSION_CODEC_DOC)
         .define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, 
atLeast(-1), MEDIUM, APPEND_LINGER_MS_DOC)
@@ -142,7 +142,6 @@ public class ShareCoordinatorConfig {
         pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
         coldPartitionSnapshotIntervalMs = 
config.getInt(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG);
         this.config = config;
-        validate();
     }
 
     public int shareCoordinatorStateTopicNumPartitions() {
@@ -200,7 +199,7 @@ public class ShareCoordinatorConfig {
     public int shareCoordinatorColdPartitionSnapshotIntervalMs() {
         return coldPartitionSnapshotIntervalMs;
     }
-    
+
     /**
      * The maximum buffer size that the share coordinator can cache.
      *
@@ -209,9 +208,4 @@ public class ShareCoordinatorConfig {
     public int shareCoordinatorCachedBufferMaxBytes() {
         return config.getInt(CACHED_BUFFER_MAX_BYTES_CONFIG);
     }
-
-    private void validate() {
-        Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 && 
snapshotUpdateRecordsPerSnapshot <= 500,
-            String.format("%s must be between [0, 500]", 
SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG));
-    }
 }

Reply via email to