This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 169e372  KAFKA-13067 Add internal config to lower the metadata log 
segment size (#11031)
169e372 is described below

commit 169e3724fa27b1a3854d2ac01ff6af8b0430d51d
Author: David Arthur <[email protected]>
AuthorDate: Tue Jul 13 19:23:32 2021 -0400

    KAFKA-13067 Add internal config to lower the metadata log segment size 
(#11031)
    
    Add an internal configuration in order to facilitate system and integration 
tests that need a smaller
    log segment size. Since this is not intended for use in production, log an 
ERROR message if it is
    set to a non-default level.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   | 16 ++++++++++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 +++
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    | 31 +++++++++++++++++++---
 3 files changed, 47 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala 
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 697d7b6..51e95d7 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -18,13 +18,15 @@ package kafka.raft
 
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Defaults, Log, LogConfig, LogOffsetSnapshot, 
SnapshotGenerated}
+import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, 
MetadataLogSegmentMinBytesProp}
 import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
 import org.apache.kafka.common.config.AbstractConfig
+import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, 
Records}
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
-import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, 
LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, 
ValidOffsetAndEpoch}
+import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, 
LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, 
ValidOffsetAndEpoch}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, 
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, 
Snapshots}
 
 import java.io.File
@@ -514,6 +516,7 @@ object MetadataLogConfig {
   def apply(config: AbstractConfig, maxBatchSizeInBytes: Int, 
maxFetchSizeInBytes: Int): MetadataLogConfig = {
     new MetadataLogConfig(
       config.getInt(KafkaConfig.MetadataLogSegmentBytesProp),
+      config.getInt(KafkaConfig.MetadataLogSegmentMinBytesProp),
       config.getLong(KafkaConfig.MetadataLogSegmentMillisProp),
       config.getLong(KafkaConfig.MetadataMaxRetentionBytesProp),
       config.getLong(KafkaConfig.MetadataMaxRetentionMillisProp),
@@ -526,6 +529,7 @@ object MetadataLogConfig {
 }
 
 case class MetadataLogConfig(logSegmentBytes: Int,
+                             logSegmentMinBytes: Int,
                              logSegmentMillis: Long,
                              retentionMaxBytes: Long,
                              retentionMillis: Long,
@@ -553,6 +557,10 @@ object KafkaMetadataLog {
     LogConfig.validateValues(props)
     val defaultLogConfig = LogConfig(props)
 
+    if (config.logSegmentBytes < config.logSegmentMinBytes) {
+      throw new InvalidConfigurationException(s"Cannot set 
$MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}")
+    }
+
     val log = Log(
       dir = dataDir,
       config = defaultLogConfig,
@@ -578,6 +586,12 @@ object KafkaMetadataLog {
       config
     )
 
+    // Print a warning if users have overridden the internal config
+    if (config.logSegmentMinBytes != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) {
+      metadataLog.error(s"Overriding $MetadataLogSegmentMinBytesProp is only 
supported for testing. Setting " +
+        s"this value too low may lead to an inability to write batches of 
metadata records.")
+    }
+
     // When recovering, truncate fully if the latest snapshot is after the log 
end offset. This can happen to a follower
     // when the follower crashes after downloading a snapshot from the leader 
but before it could truncate the log fully.
     metadataLog.truncateToLatestSnapshot()
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 634bb4d..2a04673 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -381,6 +381,7 @@ object KafkaConfig {
   val MetadataSnapshotMaxNewRecordBytesProp = 
"metadata.log.max.record.bytes.between.snapshots"
   val ControllerListenerNamesProp = "controller.listener.names"
   val SaslMechanismControllerProtocolProp = 
"sasl.mechanism.controller.protocol"
+  val MetadataLogSegmentMinBytesProp = "metadata.log.segment.min.bytes"
   val MetadataLogSegmentBytesProp = "metadata.log.segment.bytes"
   val MetadataLogSegmentMillisProp = "metadata.log.segment.ms"
   val MetadataMaxRetentionBytesProp = "metadata.max.retention.bytes"
@@ -678,6 +679,8 @@ object KafkaConfig {
     "if running in KRaft mode. The ZK-based controller will not use this 
configuration."
   val SaslMechanismControllerProtocolDoc = "SASL mechanism used for 
communication with controllers. Default is GSSAPI."
   val MetadataLogSegmentBytesDoc = "The maximum size of a single metadata log 
file."
+  val MetadataLogSegmentMinBytesDoc = "Override the minimum size for a single 
metadata log file. This should be used for testing only."
+
   val MetadataLogSegmentMillisDoc = "The maximum time before a new metadata 
log file is rolled out (in milliseconds)."
   val MetadataMaxRetentionBytesDoc = "The maximum combined size of the 
metadata log and snapshots before deleting old " +
     "snapshots and log files. Since at least one snapshot must exist before 
any logs can be deleted, this is a soft limit."
@@ -1073,6 +1076,7 @@ object KafkaConfig {
       .define(SaslMechanismControllerProtocolProp, STRING, 
SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, 
SaslMechanismControllerProtocolDoc)
       .define(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
       .define(MetadataLogSegmentBytesProp, INT, Defaults.LogSegmentBytes, 
atLeast(Records.LOG_OVERHEAD), HIGH, MetadataLogSegmentBytesDoc)
+      .defineInternal(MetadataLogSegmentMinBytesProp, INT, 8 * 1024 * 1024, 
atLeast(Records.LOG_OVERHEAD), HIGH, MetadataLogSegmentMinBytesDoc)
       .define(MetadataLogSegmentMillisProp, LONG, Defaults.LogRollHours * 60 * 
60 * 1000L, null, HIGH, MetadataLogSegmentMillisDoc)
       .define(MetadataMaxRetentionBytesProp, LONG, Defaults.LogRetentionBytes, 
null, HIGH, MetadataMaxRetentionBytesDoc)
       .define(MetadataMaxRetentionMillisProp, LONG, Defaults.LogRetentionHours 
* 60 * 60 * 1000L, null, HIGH, MetadataMaxRetentionMillisDoc)
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala 
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index fb477f3..5fb4b12 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -17,9 +17,10 @@
 package kafka.raft
 
 import kafka.log.{Defaults, Log, SegmentDeletion}
-import kafka.server.KafkaRaftServer
+import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, 
MetadataLogSegmentMillisProp, MetadataLogSegmentMinBytesProp, NodeIdProp, 
ProcessRolesProp}
+import kafka.server.{KafkaConfig, KafkaRaftServer}
 import kafka.utils.{MockTime, TestUtils}
-import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.kafka.common.errors.{InvalidConfigurationException, 
RecordTooLargeException}
 import org.apache.kafka.common.protocol
 import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
SimpleRecord}
@@ -35,7 +36,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import java.io.File
 import java.nio.ByteBuffer
 import java.nio.file.{Files, Path}
-import java.util.{Collections, Optional}
+import java.util
+import java.util.{Collections, Optional, Properties}
 
 final class KafkaMetadataLogTest {
   import KafkaMetadataLogTest._
@@ -54,6 +56,25 @@ final class KafkaMetadataLogTest {
   }
 
   @Test
+  def testConfig(): Unit = {
+    val props = new Properties()
+    props.put(ProcessRolesProp, util.Arrays.asList("broker"))
+    props.put(NodeIdProp, Int.box(1))
+    props.put(MetadataLogSegmentBytesProp, Int.box(10240))
+    props.put(MetadataLogSegmentMillisProp, Int.box(10 * 1024))
+    assertThrows(classOf[InvalidConfigurationException], () => {
+      val kafkaConfig = KafkaConfig.fromProps(props)
+      val metadataConfig = MetadataLogConfig.apply(kafkaConfig, 
KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
+      buildMetadataLog(tempDir, mockTime, metadataConfig)
+    })
+
+    props.put(MetadataLogSegmentMinBytesProp, Int.box(10240))
+    val kafkaConfig = KafkaConfig.fromProps(props)
+    val metadataConfig = MetadataLogConfig.apply(kafkaConfig, 
KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
+    buildMetadataLog(tempDir, mockTime, metadataConfig)
+  }
+
+  @Test
   def testUnexpectedAppendOffset(): Unit = {
     val log = buildMetadataLog(tempDir, mockTime)
 
@@ -732,6 +753,7 @@ final class KafkaMetadataLogTest {
   def testAdvanceLogStartOffsetAfterCleaning(): Unit = {
     val config = MetadataLogConfig(
       logSegmentBytes = 512,
+      logSegmentMinBytes = 512,
       logSegmentMillis = 10 * 1000,
       retentionMaxBytes = 256,
       retentionMillis = 60 * 1000,
@@ -776,6 +798,7 @@ final class KafkaMetadataLogTest {
     // Generate some logs and a few snapshots, set retention low and verify 
that cleaning occurs
     val config = DefaultMetadataLogConfig.copy(
       logSegmentBytes = 1024,
+      logSegmentMinBytes = 1024,
       logSegmentMillis = 10 * 1000,
       retentionMaxBytes = 1024,
       retentionMillis = 60 * 1000,
@@ -810,6 +833,7 @@ final class KafkaMetadataLogTest {
     // Set retention equal to the segment size and generate slightly more than 
one segment of logs
     val config = DefaultMetadataLogConfig.copy(
       logSegmentBytes = 10240,
+      logSegmentMinBytes = 10240,
       logSegmentMillis = 10 * 1000,
       retentionMaxBytes = 10240,
       retentionMillis = 60 * 1000,
@@ -865,6 +889,7 @@ object KafkaMetadataLogTest {
 
   val DefaultMetadataLogConfig = MetadataLogConfig(
     logSegmentBytes = 100 * 1024,
+    logSegmentMinBytes = 100 * 1024,
     logSegmentMillis = 10 * 1000,
     retentionMaxBytes = 100 * 1024,
     retentionMillis = 60 * 1000,

Reply via email to