This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 169e372 KAFKA-13067 Add internal config to lower the metadata log
segment size (#11031)
169e372 is described below
commit 169e3724fa27b1a3854d2ac01ff6af8b0430d51d
Author: David Arthur <[email protected]>
AuthorDate: Tue Jul 13 19:23:32 2021 -0400
KAFKA-13067 Add internal config to lower the metadata log segment size
(#11031)
Add an internal configuration in order to facilitate system and integration
tests that need a smaller
log segment size. Since this is not intended for use in production, log an
ERROR message if it is
set to a non-default level.
Reviewers: Colin P. McCabe <[email protected]>
---
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 16 ++++++++++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 4 +++
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 31 +++++++++++++++++++---
3 files changed, 47 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 697d7b6..51e95d7 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -18,13 +18,15 @@ package kafka.raft
import kafka.api.ApiVersion
import kafka.log.{AppendOrigin, Defaults, Log, LogConfig, LogOffsetSnapshot,
SnapshotGenerated}
+import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp,
MetadataLogSegmentMinBytesProp}
import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd,
KafkaConfig, LogDirFailureChannel, RequestLocal}
import kafka.utils.{CoreUtils, Logging, Scheduler}
import org.apache.kafka.common.config.AbstractConfig
+import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords,
Records}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
-import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo,
LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog,
ValidOffsetAndEpoch}
+import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo,
LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog,
ValidOffsetAndEpoch}
import org.apache.kafka.snapshot.{FileRawSnapshotReader,
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath,
Snapshots}
import java.io.File
@@ -514,6 +516,7 @@ object MetadataLogConfig {
def apply(config: AbstractConfig, maxBatchSizeInBytes: Int,
maxFetchSizeInBytes: Int): MetadataLogConfig = {
new MetadataLogConfig(
config.getInt(KafkaConfig.MetadataLogSegmentBytesProp),
+ config.getInt(KafkaConfig.MetadataLogSegmentMinBytesProp),
config.getLong(KafkaConfig.MetadataLogSegmentMillisProp),
config.getLong(KafkaConfig.MetadataMaxRetentionBytesProp),
config.getLong(KafkaConfig.MetadataMaxRetentionMillisProp),
@@ -526,6 +529,7 @@ object MetadataLogConfig {
}
case class MetadataLogConfig(logSegmentBytes: Int,
+ logSegmentMinBytes: Int,
logSegmentMillis: Long,
retentionMaxBytes: Long,
retentionMillis: Long,
@@ -553,6 +557,10 @@ object KafkaMetadataLog {
LogConfig.validateValues(props)
val defaultLogConfig = LogConfig(props)
+ if (config.logSegmentBytes < config.logSegmentMinBytes) {
+ throw new InvalidConfigurationException(s"Cannot set
$MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}")
+ }
+
val log = Log(
dir = dataDir,
config = defaultLogConfig,
@@ -578,6 +586,12 @@ object KafkaMetadataLog {
config
)
+ // Print a warning if users have overridden the internal config
+ if (config.logSegmentMinBytes != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) {
+ metadataLog.error(s"Overriding $MetadataLogSegmentMinBytesProp is only
supported for testing. Setting " +
+ s"this value too low may lead to an inability to write batches of
metadata records.")
+ }
+
// When recovering, truncate fully if the latest snapshot is after the log
end offset. This can happen to a follower
// when the follower crashes after downloading a snapshot from the leader
but before it could truncate the log fully.
metadataLog.truncateToLatestSnapshot()
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 634bb4d..2a04673 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -381,6 +381,7 @@ object KafkaConfig {
val MetadataSnapshotMaxNewRecordBytesProp =
"metadata.log.max.record.bytes.between.snapshots"
val ControllerListenerNamesProp = "controller.listener.names"
val SaslMechanismControllerProtocolProp =
"sasl.mechanism.controller.protocol"
+ val MetadataLogSegmentMinBytesProp = "metadata.log.segment.min.bytes"
val MetadataLogSegmentBytesProp = "metadata.log.segment.bytes"
val MetadataLogSegmentMillisProp = "metadata.log.segment.ms"
val MetadataMaxRetentionBytesProp = "metadata.max.retention.bytes"
@@ -678,6 +679,8 @@ object KafkaConfig {
"if running in KRaft mode. The ZK-based controller will not use this
configuration."
val SaslMechanismControllerProtocolDoc = "SASL mechanism used for
communication with controllers. Default is GSSAPI."
val MetadataLogSegmentBytesDoc = "The maximum size of a single metadata log
file."
+ val MetadataLogSegmentMinBytesDoc = "Override the minimum size for a single
metadata log file. This should be used for testing only."
+
val MetadataLogSegmentMillisDoc = "The maximum time before a new metadata
log file is rolled out (in milliseconds)."
val MetadataMaxRetentionBytesDoc = "The maximum combined size of the
metadata log and snapshots before deleting old " +
"snapshots and log files. Since at least one snapshot must exist before
any logs can be deleted, this is a soft limit."
@@ -1073,6 +1076,7 @@ object KafkaConfig {
.define(SaslMechanismControllerProtocolProp, STRING,
SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH,
SaslMechanismControllerProtocolDoc)
.define(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
.define(MetadataLogSegmentBytesProp, INT, Defaults.LogSegmentBytes,
atLeast(Records.LOG_OVERHEAD), HIGH, MetadataLogSegmentBytesDoc)
+ .defineInternal(MetadataLogSegmentMinBytesProp, INT, 8 * 1024 * 1024,
atLeast(Records.LOG_OVERHEAD), HIGH, MetadataLogSegmentMinBytesDoc)
.define(MetadataLogSegmentMillisProp, LONG, Defaults.LogRollHours * 60 *
60 * 1000L, null, HIGH, MetadataLogSegmentMillisDoc)
.define(MetadataMaxRetentionBytesProp, LONG, Defaults.LogRetentionBytes,
null, HIGH, MetadataMaxRetentionBytesDoc)
.define(MetadataMaxRetentionMillisProp, LONG, Defaults.LogRetentionHours
* 60 * 60 * 1000L, null, HIGH, MetadataMaxRetentionMillisDoc)
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index fb477f3..5fb4b12 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -17,9 +17,10 @@
package kafka.raft
import kafka.log.{Defaults, Log, SegmentDeletion}
-import kafka.server.KafkaRaftServer
+import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp,
MetadataLogSegmentMillisProp, MetadataLogSegmentMinBytesProp, NodeIdProp,
ProcessRolesProp}
+import kafka.server.{KafkaConfig, KafkaRaftServer}
import kafka.utils.{MockTime, TestUtils}
-import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.kafka.common.errors.{InvalidConfigurationException,
RecordTooLargeException}
import org.apache.kafka.common.protocol
import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
SimpleRecord}
@@ -35,7 +36,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import java.io.File
import java.nio.ByteBuffer
import java.nio.file.{Files, Path}
-import java.util.{Collections, Optional}
+import java.util
+import java.util.{Collections, Optional, Properties}
final class KafkaMetadataLogTest {
import KafkaMetadataLogTest._
@@ -54,6 +56,25 @@ final class KafkaMetadataLogTest {
}
@Test
+ def testConfig(): Unit = {
+ val props = new Properties()
+ props.put(ProcessRolesProp, util.Arrays.asList("broker"))
+ props.put(NodeIdProp, Int.box(1))
+ props.put(MetadataLogSegmentBytesProp, Int.box(10240))
+ props.put(MetadataLogSegmentMillisProp, Int.box(10 * 1024))
+ assertThrows(classOf[InvalidConfigurationException], () => {
+ val kafkaConfig = KafkaConfig.fromProps(props)
+ val metadataConfig = MetadataLogConfig.apply(kafkaConfig,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
+ buildMetadataLog(tempDir, mockTime, metadataConfig)
+ })
+
+ props.put(MetadataLogSegmentMinBytesProp, Int.box(10240))
+ val kafkaConfig = KafkaConfig.fromProps(props)
+ val metadataConfig = MetadataLogConfig.apply(kafkaConfig,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
+ buildMetadataLog(tempDir, mockTime, metadataConfig)
+ }
+
+ @Test
def testUnexpectedAppendOffset(): Unit = {
val log = buildMetadataLog(tempDir, mockTime)
@@ -732,6 +753,7 @@ final class KafkaMetadataLogTest {
def testAdvanceLogStartOffsetAfterCleaning(): Unit = {
val config = MetadataLogConfig(
logSegmentBytes = 512,
+ logSegmentMinBytes = 512,
logSegmentMillis = 10 * 1000,
retentionMaxBytes = 256,
retentionMillis = 60 * 1000,
@@ -776,6 +798,7 @@ final class KafkaMetadataLogTest {
// Generate some logs and a few snapshots, set retention low and verify
that cleaning occurs
val config = DefaultMetadataLogConfig.copy(
logSegmentBytes = 1024,
+ logSegmentMinBytes = 1024,
logSegmentMillis = 10 * 1000,
retentionMaxBytes = 1024,
retentionMillis = 60 * 1000,
@@ -810,6 +833,7 @@ final class KafkaMetadataLogTest {
// Set retention equal to the segment size and generate slightly more than
one segment of logs
val config = DefaultMetadataLogConfig.copy(
logSegmentBytes = 10240,
+ logSegmentMinBytes = 10240,
logSegmentMillis = 10 * 1000,
retentionMaxBytes = 10240,
retentionMillis = 60 * 1000,
@@ -865,6 +889,7 @@ object KafkaMetadataLogTest {
val DefaultMetadataLogConfig = MetadataLogConfig(
logSegmentBytes = 100 * 1024,
+ logSegmentMinBytes = 100 * 1024,
logSegmentMillis = 10 * 1000,
retentionMaxBytes = 100 * 1024,
retentionMillis = 60 * 1000,