This is an automated email from the ASF dual-hosted git repository. divijv pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 54fe0f0135f KAFKA-16368: Add a new constraint for segment.bytes to min 1MB for KIP-1030 (#18140) 54fe0f0135f is described below commit 54fe0f0135f807959fd6a22c1db1b1d890561489 Author: Jason Taylor <jasta...@amazon.com> AuthorDate: Thu Jan 16 15:07:00 2025 +0000 KAFKA-16368: Add a new constraint for segment.bytes to min 1MB for KIP-1030 (#18140) Reviewers: Divij Vaidya <di...@amazon.com> --- core/src/test/java/kafka/admin/DeleteTopicTest.java | 4 +++- .../kafka/server/DynamicBrokerReconfigurationTest.scala | 9 +++++---- core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala | 2 +- core/src/test/scala/unit/kafka/server/LogOffsetTest.scala | 1 - docs/upgrade.html | 4 ++++ .../java/org/apache/kafka/storage/internals/log/LogConfig.java | 2 +- 6 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/test/java/kafka/admin/DeleteTopicTest.java b/core/src/test/java/kafka/admin/DeleteTopicTest.java index c3f1efef7b5..be87e086f7f 100644 --- a/core/src/test/java/kafka/admin/DeleteTopicTest.java +++ b/core/src/test/java/kafka/admin/DeleteTopicTest.java @@ -56,6 +56,7 @@ import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import java.util.stream.Collectors; +import scala.Option; import scala.jdk.javaapi.OptionConverters; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -238,7 +239,6 @@ public class DeleteTopicTest { @ClusterTest(serverProperties = { @ClusterConfigProperty(key = "log.cleaner.enable", value = "true"), @ClusterConfigProperty(key = "log.cleanup.policy", value = "compact"), - @ClusterConfigProperty(key = "log.segment.bytes", value = "100"), @ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", value = "1048577") }) public void testDeleteTopicWithCleaner(ClusterInstance cluster) throws Exception { @@ -251,6 +251,8 @@ public class DeleteTopicTest { "Replicas for topic test not created."); UnifiedLog log = server.logManager().getLog(topicPartition, false).get(); writeDups(100, 3, log); + // force roll the segment so that cleaner can work on it + server.logManager().getLog(topicPartition, false).get().roll(Option.empty()); // wait for cleaner to clean server.logManager().cleaner().awaitCleaned(topicPartition, 0, 60000); admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get(); diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 26c65f46603..49a0ebc21f4 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -128,7 +128,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "requested") props.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN") props.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(",")) - props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "2000") // low value to test log rolling on config update + props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576") // low value to test log rolling on config update props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2") // greater than one to test reducing threads props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 1680000000.toString) props.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, 168.toString) @@ -587,7 +587,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val props = new Properties val logIndexSizeMaxBytes = "100000" val logRetentionMs = TimeUnit.DAYS.toMillis(1) - props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000") + props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576") props.put(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, TimeUnit.HOURS.toMillis(2).toString) props.put(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, TimeUnit.HOURS.toMillis(1).toString) props.put(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG, logIndexSizeMaxBytes) @@ -609,11 +609,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.toString) props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, "1000") props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, "1000") - reconfigureServers(props, perBrokerConfig = false, (ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000")) + reconfigureServers(props, perBrokerConfig = false, (ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576")) // Verify that all broker defaults have been updated servers.foreach { server => props.forEach { (k, v) => + TestUtils.waitUntilTrue(() => server.config.originals.get(k) != null, "Configs not present") assertEquals(server.config.originals.get(k).toString, v, s"Not reconfigured $k") } } @@ -624,7 +625,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup "Config not updated in LogManager") val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) - TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config using defaults not updated") + TestUtils.waitUntilTrue(() => log.config.segmentSize == 1048576, "Existing topic config using defaults not updated") val KafkaConfigToLogConfigName: Map[String, String] = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) } props.asScala.foreach { case (k, v) => diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 971018ffc19..31c192ff9ef 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1118,7 +1118,7 @@ class KafkaConfigTest { case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertDynamic(kafkaConfigProp, "5", () => config.zstdCompressionLevel) case TopicConfig.SEGMENT_BYTES_CONFIG => - assertDynamic(kafkaConfigProp, 10000, () => config.logSegmentBytes) + assertDynamic(kafkaConfigProp, 1048576, () => config.logSegmentBytes) case TopicConfig.SEGMENT_MS_CONFIG => assertDynamic(kafkaConfigProp, 10001L, () => config.logRollTimeMillis) case TopicConfig.DELETE_RETENTION_MS_CONFIG => diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index dc3e7d7c6ab..efb057bd1cb 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -52,7 +52,6 @@ class LogOffsetTest extends BaseRequestTest { props.put("num.partitions", "20") props.put("log.retention.hours", "10") props.put("log.retention.check.interval.ms", (5 * 1000 * 60).toString) - props.put("log.segment.bytes", "140") } @ParameterizedTest diff --git a/docs/upgrade.html b/docs/upgrade.html index 8cb91d7b248..1be5e1836c7 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -111,6 +111,10 @@ The <code>remote.log.manager.thread.pool.size</code> configuration default value was changed to 2 from 10. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a> </li> + <li> + The minimum <code>segment.bytes/log.segment.bytes</code> has changed from 14 bytes to 1MB. + See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a> + </li> </ul> </li> <li><b>MirrorMaker</b> diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index e8935249261..f4294329f25 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -159,7 +159,7 @@ public class LogConfig extends AbstractConfig { .define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.NUM_PARTITIONS_DOC) .define(ServerLogConfigs.LOG_DIR_CONFIG, STRING, ServerLogConfigs.LOG_DIR_DEFAULT, HIGH, ServerLogConfigs.LOG_DIR_DOC) .define(ServerLogConfigs.LOG_DIRS_CONFIG, STRING, null, HIGH, ServerLogConfigs.LOG_DIRS_DOC) - .define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), HIGH, ServerLogConfigs.LOG_SEGMENT_BYTES_DOC) + .define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), HIGH, ServerLogConfigs.LOG_SEGMENT_BYTES_DOC) .define(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_ROLL_TIME_MILLIS_DOC) .define(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, INT, (int) TimeUnit.MILLISECONDS.toHours(DEFAULT_SEGMENT_MS), atLeast(1), HIGH, ServerLogConfigs.LOG_ROLL_TIME_HOURS_DOC)