This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 54fe0f0135f KAFKA-16368: Add a new constraint for segment.bytes to min 
1MB for KIP-1030 (#18140)
54fe0f0135f is described below

commit 54fe0f0135f807959fd6a22c1db1b1d890561489
Author: Jason Taylor <jasta...@amazon.com>
AuthorDate: Thu Jan 16 15:07:00 2025 +0000

    KAFKA-16368: Add a new constraint for segment.bytes to min 1MB for KIP-1030 
(#18140)
    
    Reviewers: Divij Vaidya <di...@amazon.com>
---
 core/src/test/java/kafka/admin/DeleteTopicTest.java              | 4 +++-
 .../kafka/server/DynamicBrokerReconfigurationTest.scala          | 9 +++++----
 core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala      | 2 +-
 core/src/test/scala/unit/kafka/server/LogOffsetTest.scala        | 1 -
 docs/upgrade.html                                                | 4 ++++
 .../java/org/apache/kafka/storage/internals/log/LogConfig.java   | 2 +-
 6 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/core/src/test/java/kafka/admin/DeleteTopicTest.java 
b/core/src/test/java/kafka/admin/DeleteTopicTest.java
index c3f1efef7b5..be87e086f7f 100644
--- a/core/src/test/java/kafka/admin/DeleteTopicTest.java
+++ b/core/src/test/java/kafka/admin/DeleteTopicTest.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import scala.Option;
 import scala.jdk.javaapi.OptionConverters;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -238,7 +239,6 @@ public class DeleteTopicTest {
     @ClusterTest(serverProperties = {
         @ClusterConfigProperty(key = "log.cleaner.enable", value = "true"),
         @ClusterConfigProperty(key = "log.cleanup.policy", value = "compact"),
-        @ClusterConfigProperty(key = "log.segment.bytes", value = "100"),
         @ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", value = 
"1048577")
     })
     public void testDeleteTopicWithCleaner(ClusterInstance cluster) throws 
Exception {
@@ -251,6 +251,8 @@ public class DeleteTopicTest {
                 "Replicas for topic test not created.");
             UnifiedLog log = server.logManager().getLog(topicPartition, 
false).get();
             writeDups(100, 3, log);
+            // force roll the segment so that cleaner can work on it
+            server.logManager().getLog(topicPartition, 
false).get().roll(Option.empty());
             // wait for cleaner to clean
             server.logManager().cleaner().awaitCleaned(topicPartition, 0, 
60000);
             admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 26c65f46603..49a0ebc21f4 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -128,7 +128,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       props.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "requested")
       
props.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, 
"PLAIN")
       props.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
kafkaServerSaslMechanisms.mkString(","))
-      props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "2000") // low 
value to test log rolling on config update
+      props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576") // low 
value to test log rolling on config update
       props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2") // 
greater than one to test reducing threads
       props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 
1680000000.toString)
       props.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, 168.toString)
@@ -587,7 +587,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     val props = new Properties
     val logIndexSizeMaxBytes = "100000"
     val logRetentionMs = TimeUnit.DAYS.toMillis(1)
-    props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000")
+    props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576")
     props.put(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, 
TimeUnit.HOURS.toMillis(2).toString)
     props.put(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, 
TimeUnit.HOURS.toMillis(1).toString)
     props.put(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG, 
logIndexSizeMaxBytes)
@@ -609,11 +609,12 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, 
TimestampType.LOG_APPEND_TIME.toString)
     props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, 
"1000")
     props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, 
"1000")
-    reconfigureServers(props, perBrokerConfig = false, 
(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000"))
+    reconfigureServers(props, perBrokerConfig = false, 
(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576"))
 
     // Verify that all broker defaults have been updated
     servers.foreach { server =>
       props.forEach { (k, v) =>
+        TestUtils.waitUntilTrue(() => server.config.originals.get(k) != null, 
"Configs not present")
         assertEquals(server.config.originals.get(k).toString, v, s"Not 
reconfigured $k")
       }
     }
@@ -624,7 +625,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       "Config not updated in LogManager")
 
     val log = servers.head.logManager.getLog(new TopicPartition(topic, 
0)).getOrElse(throw new IllegalStateException("Log not found"))
-    TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing 
topic config using defaults not updated")
+    TestUtils.waitUntilTrue(() => log.config.segmentSize == 1048576, "Existing 
topic config using defaults not updated")
     val KafkaConfigToLogConfigName: Map[String, String] =
       ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, 
v) => (v, k) }
     props.asScala.foreach { case (k, v) =>
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 971018ffc19..31c192ff9ef 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1118,7 +1118,7 @@ class KafkaConfigTest {
         case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG =>
           assertDynamic(kafkaConfigProp, "5", () => 
config.zstdCompressionLevel)
         case TopicConfig.SEGMENT_BYTES_CONFIG =>
-          assertDynamic(kafkaConfigProp, 10000, () => config.logSegmentBytes)
+          assertDynamic(kafkaConfigProp, 1048576, () => config.logSegmentBytes)
         case TopicConfig.SEGMENT_MS_CONFIG =>
           assertDynamic(kafkaConfigProp, 10001L, () => 
config.logRollTimeMillis)
         case TopicConfig.DELETE_RETENTION_MS_CONFIG =>
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index dc3e7d7c6ab..efb057bd1cb 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -52,7 +52,6 @@ class LogOffsetTest extends BaseRequestTest {
     props.put("num.partitions", "20")
     props.put("log.retention.hours", "10")
     props.put("log.retention.check.interval.ms", (5 * 1000 * 60).toString)
-    props.put("log.segment.bytes", "140")
   }
 
   @ParameterizedTest
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 8cb91d7b248..1be5e1836c7 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -111,6 +111,10 @@
                             The 
<code>remote.log.manager.thread.pool.size</code> configuration default value 
was changed to 2 from 10.
                             See <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations";>KIP-1030</a>
 
                         </li>
+                        <li>
+                            The minimum 
<code>segment.bytes/log.segment.bytes</code> has changed from 14 bytes to 1MB.
+                            See <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations";>KIP-1030</a>
 
+                        </li>
                     </ul>
                 </li>
                 <li><b>MirrorMaker</b>
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index e8935249261..f4294329f25 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -159,7 +159,7 @@ public class LogConfig extends AbstractConfig {
             .define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, 
ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, 
ServerLogConfigs.NUM_PARTITIONS_DOC)
             .define(ServerLogConfigs.LOG_DIR_CONFIG, STRING, 
ServerLogConfigs.LOG_DIR_DEFAULT, HIGH, ServerLogConfigs.LOG_DIR_DOC)
             .define(ServerLogConfigs.LOG_DIRS_CONFIG, STRING, null, HIGH, 
ServerLogConfigs.LOG_DIRS_DOC)
-            .define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, 
DEFAULT_SEGMENT_BYTES, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), HIGH, 
ServerLogConfigs.LOG_SEGMENT_BYTES_DOC)
+            .define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, 
DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), HIGH, 
ServerLogConfigs.LOG_SEGMENT_BYTES_DOC)
 
             .define(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, LONG, null, 
HIGH, ServerLogConfigs.LOG_ROLL_TIME_MILLIS_DOC)
             .define(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, INT, (int) 
TimeUnit.MILLISECONDS.toHours(DEFAULT_SEGMENT_MS), atLeast(1), HIGH, 
ServerLogConfigs.LOG_ROLL_TIME_HOURS_DOC)

Reply via email to