This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eea369af947 KAFKA-14588 Log cleaner configuration move to
CleanerConfig (#15387)
eea369af947 is described below
commit eea369af947dcff567f849183ba2217ac5e9a2ba
Author: Nikolay <[email protected]>
AuthorDate: Tue Mar 5 13:11:56 2024 +0300
KAFKA-14588 Log cleaner configuration move to CleanerConfig (#15387)
In order to move ConfigCommand to tools we must move all it's dependencies
which includes KafkaConfig and other core classes to java. This PR moves log
cleaner configuration to CleanerConfig class of storage module.
Reviewers: Chia-Ping Tsai <[email protected]>
---
build.gradle | 2 +
checkstyle/import-control-core.xml | 1 +
checkstyle/import-control.xml | 2 +
.../util/clusters/EmbeddedKafkaCluster.java | 3 +-
core/src/main/scala/kafka/log/LogCleaner.scala | 12 ++--
core/src/main/scala/kafka/server/KafkaConfig.scala | 76 +++++++---------------
.../java/kafka/testkit/KafkaClusterTestKit.java | 3 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 12 ++--
.../server/DynamicBrokerReconfigurationTest.scala | 32 ++++-----
.../LogCleanerParameterizedIntegrationTest.scala | 12 ++--
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 8 +--
.../unit/kafka/server/ControllerApisTest.scala | 13 ++--
.../kafka/server/DynamicBrokerConfigTest.scala | 18 ++---
.../scala/unit/kafka/server/KafkaConfigTest.scala | 18 ++---
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
.../server/config/ServerTopicConfigSynonyms.java | 2 +-
.../org/apache/kafka/server/config/Defaults.java | 7 --
.../kafka/storage/internals/log/CleanerConfig.java | 42 ++++++++++++
.../integration/utils/EmbeddedKafkaCluster.java | 3 +-
19 files changed, 141 insertions(+), 127 deletions(-)
diff --git a/build.gradle b/build.gradle
index f2ba22e86b3..7dc8f50342f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2177,6 +2177,7 @@ project(':streams') {
testImplementation project(':core')
testImplementation project(':tools')
testImplementation project(':core').sourceSets.test.output
+ testImplementation project(':storage')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation libs.log4j
@@ -2975,6 +2976,7 @@ project(':connect:runtime') {
testImplementation project(':metadata')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common')
+ testImplementation project(':storage')
testImplementation project(':connect:test-plugins')
testImplementation libs.easymock
diff --git a/checkstyle/import-control-core.xml
b/checkstyle/import-control-core.xml
index 3f9a21fffc6..782d2fe4617 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -56,6 +56,7 @@
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
+ <allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
</subpackage>
<subpackage name="tools">
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index caf1fe5ebe1..a52b3d94e32 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -410,6 +410,7 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.tools" />
<allow pkg="org.apache.kafka.server.config" />
+ <allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
</subpackage>
<subpackage name="test">
@@ -601,6 +602,7 @@
<allow pkg="com.fasterxml.jackson.core.type" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.eclipse.jetty.client"/>
+ <allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
</subpackage>
</subpackage>
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index e3694cae291..c15aa27ae59 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -55,6 +55,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,7 +161,7 @@ public class EmbeddedKafkaCluster {
putIfAbsent(brokerConfig,
KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(),
false);
// reduce the size of the log cleaner map to reduce test memory usage
- putIfAbsent(brokerConfig,
KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
+ putIfAbsent(brokerConfig,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
Object listenerConfig =
brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
if (listenerConfig == null)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 8098ea237e0..b653f40b287 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -499,13 +499,13 @@ class LogCleaner(initialConfig: CleanerConfig,
object LogCleaner {
val ReconfigurableConfigs: Set[String] = Set(
- KafkaConfig.LogCleanerThreadsProp,
- KafkaConfig.LogCleanerDedupeBufferSizeProp,
- KafkaConfig.LogCleanerDedupeBufferLoadFactorProp,
- KafkaConfig.LogCleanerIoBufferSizeProp,
+ CleanerConfig.LOG_CLEANER_THREADS_PROP,
+ CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
+ CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP,
+ CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP,
KafkaConfig.MessageMaxBytesProp,
- KafkaConfig.LogCleanerIoMaxBytesPerSecondProp,
- KafkaConfig.LogCleanerBackoffMsProp
+ CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
+ CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP
)
def cleanerConfig(config: KafkaConfig): CleanerConfig = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c6f51a000e2..eaddb047da8 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -49,7 +49,7 @@ import org.apache.kafka.server.config.{Defaults,
ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Csv
-import org.apache.kafka.storage.internals.log.{LogConfig,
ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.zookeeper.client.ZKClientConfig
@@ -211,17 +211,6 @@ object KafkaConfig {
val LogRetentionBytesProp =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_BYTES_CONFIG)
val LogCleanupIntervalMsProp = LogConfigPrefix +
"retention.check.interval.ms"
val LogCleanupPolicyProp =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.CLEANUP_POLICY_CONFIG)
- val LogCleanerThreadsProp = LogConfigPrefix + "cleaner.threads"
- val LogCleanerIoMaxBytesPerSecondProp = LogConfigPrefix +
"cleaner.io.max.bytes.per.second"
- val LogCleanerDedupeBufferSizeProp = LogConfigPrefix +
"cleaner.dedupe.buffer.size"
- val LogCleanerIoBufferSizeProp = LogConfigPrefix + "cleaner.io.buffer.size"
- val LogCleanerDedupeBufferLoadFactorProp = LogConfigPrefix +
"cleaner.io.buffer.load.factor"
- val LogCleanerBackoffMsProp = LogConfigPrefix + "cleaner.backoff.ms"
- val LogCleanerMinCleanRatioProp =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG)
- val LogCleanerEnableProp = LogConfigPrefix + "cleaner.enable"
- val LogCleanerDeleteRetentionMsProp =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.DELETE_RETENTION_MS_CONFIG)
- val LogCleanerMinCompactionLagMsProp =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG)
- val LogCleanerMaxCompactionLagMsProp =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG)
val LogIndexSizeMaxBytesProp =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)
val LogIndexIntervalBytesProp =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG)
val LogFlushIntervalMessagesProp =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG)
@@ -652,25 +641,6 @@ object KafkaConfig {
val LogRetentionBytesDoc = "The maximum size of the log before deleting it"
val LogCleanupIntervalMsDoc = "The frequency in milliseconds that the log
cleaner checks whether any log is eligible for deletion"
val LogCleanupPolicyDoc = "The default cleanup policy for segments beyond
the retention window. A comma separated list of valid policies. Valid policies
are: \"delete\" and \"compact\""
- val LogCleanerThreadsDoc = "The number of background threads to use for log
cleaning"
- val LogCleanerIoMaxBytesPerSecondDoc = "The log cleaner will be throttled so
that the sum of its read and write i/o will be less than this value on average"
- val LogCleanerDedupeBufferSizeDoc = "The total memory used for log
deduplication across all cleaner threads"
- val LogCleanerIoBufferSizeDoc = "The total memory used for log cleaner I/O
buffers across all cleaner threads"
- val LogCleanerDedupeBufferLoadFactorDoc = "Log cleaner dedupe buffer load
factor. The percentage full the dedupe buffer can become. A higher value " +
- "will allow more log to be cleaned at once but will lead to more hash
collisions"
- val LogCleanerBackoffMsDoc = "The amount of time to sleep when there are no
logs to clean"
- val LogCleanerMinCleanRatioDoc = "The minimum ratio of dirty log to total
log for a log to eligible for cleaning. " +
- "If the " + LogCleanerMaxCompactionLagMsProp + " or the " +
LogCleanerMinCompactionLagMsProp +
- " configurations are also specified, then the log compactor considers the
log eligible for compaction " +
- "as soon as either: (i) the dirty ratio threshold has been met and the log
has had dirty (uncompacted) " +
- "records for at least the " + LogCleanerMinCompactionLagMsProp + "
duration, or (ii) if the log has had " +
- "dirty (uncompacted) records for at most the " +
LogCleanerMaxCompactionLagMsProp + " period."
- val LogCleanerEnableDoc = "Enable the log cleaner process to run on the
server. Should be enabled if using any topics with a cleanup.policy=compact
including the internal offsets topic. If disabled those topics will not be
compacted and continually grow in size."
- val LogCleanerDeleteRetentionMsDoc = "The amount of time to retain tombstone
message markers for log compacted topics. This setting also gives a bound " +
- "on the time in which a consumer must complete a read if they begin from
offset 0 to ensure that they get a valid snapshot of the final stage (otherwise
" +
- "tombstones messages may be collected before a consumer completes their
scan)."
- val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will
remain uncompacted in the log. Only applicable for logs that are being
compacted."
- val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will
remain ineligible for compaction in the log. Only applicable for logs that are
being compacted."
val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
val LogIndexIntervalBytesDoc = "The interval with which we add an entry to
the offset index."
val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a
log partition before messages are flushed to disk."
@@ -1075,17 +1045,17 @@ object KafkaConfig {
.define(LogRetentionBytesProp, LONG, LogConfig.DEFAULT_RETENTION_BYTES,
HIGH, LogRetentionBytesDoc)
.define(LogCleanupIntervalMsProp, LONG,
Defaults.LOG_CLEANUP_INTERVAL_MS, atLeast(1), MEDIUM, LogCleanupIntervalMsDoc)
.define(LogCleanupPolicyProp, LIST, LogConfig.DEFAULT_CLEANUP_POLICY,
ValidList.in(TopicConfig.CLEANUP_POLICY_COMPACT,
TopicConfig.CLEANUP_POLICY_DELETE), MEDIUM, LogCleanupPolicyDoc)
- .define(LogCleanerThreadsProp, INT, Defaults.LOG_CLEANER_THREADS,
atLeast(0), MEDIUM, LogCleanerThreadsDoc)
- .define(LogCleanerIoMaxBytesPerSecondProp, DOUBLE,
Defaults.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND, MEDIUM,
LogCleanerIoMaxBytesPerSecondDoc)
- .define(LogCleanerDedupeBufferSizeProp, LONG,
Defaults.LOG_CLEANER_DEDUPE_BUFFER_SIZE, MEDIUM, LogCleanerDedupeBufferSizeDoc)
- .define(LogCleanerIoBufferSizeProp, INT,
Defaults.LOG_CLEANER_IO_BUFFER_SIZE, atLeast(0), MEDIUM,
LogCleanerIoBufferSizeDoc)
- .define(LogCleanerDedupeBufferLoadFactorProp, DOUBLE,
Defaults.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR, MEDIUM,
LogCleanerDedupeBufferLoadFactorDoc)
- .define(LogCleanerBackoffMsProp, LONG, Defaults.LOG_CLEANER_BACKOFF_MS,
atLeast(0), MEDIUM, LogCleanerBackoffMsDoc)
- .define(LogCleanerMinCleanRatioProp, DOUBLE,
LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, between(0, 1), MEDIUM,
LogCleanerMinCleanRatioDoc)
- .define(LogCleanerEnableProp, BOOLEAN, Defaults.LOG_CLEANER_ENABLE,
MEDIUM, LogCleanerEnableDoc)
- .define(LogCleanerDeleteRetentionMsProp, LONG,
LogConfig.DEFAULT_DELETE_RETENTION_MS, atLeast(0), MEDIUM,
LogCleanerDeleteRetentionMsDoc)
- .define(LogCleanerMinCompactionLagMsProp, LONG,
LogConfig.DEFAULT_MIN_COMPACTION_LAG_MS, atLeast(0), MEDIUM,
LogCleanerMinCompactionLagMsDoc)
- .define(LogCleanerMaxCompactionLagMsProp, LONG,
LogConfig.DEFAULT_MAX_COMPACTION_LAG_MS, atLeast(1), MEDIUM,
LogCleanerMaxCompactionLagMsDoc)
+ .define(CleanerConfig.LOG_CLEANER_THREADS_PROP, INT,
CleanerConfig.LOG_CLEANER_THREADS, atLeast(0), MEDIUM,
CleanerConfig.LOG_CLEANER_THREADS_DOC)
+ .define(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, DOUBLE,
CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND, MEDIUM,
CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_DOC)
+ .define(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, LONG,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE, MEDIUM,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_DOC)
+ .define(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, INT,
CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE, atLeast(0), MEDIUM,
CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_DOC)
+ .define(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP,
DOUBLE, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR, MEDIUM,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_DOC)
+ .define(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, LONG,
CleanerConfig.LOG_CLEANER_BACKOFF_MS, atLeast(0), MEDIUM,
CleanerConfig.LOG_CLEANER_BACKOFF_MS_DOC)
+ .define(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP, DOUBLE,
LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, between(0, 1), MEDIUM,
CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_DOC)
+ .define(CleanerConfig.LOG_CLEANER_ENABLE_PROP, BOOLEAN,
CleanerConfig.LOG_CLEANER_ENABLE, MEDIUM, CleanerConfig.LOG_CLEANER_ENABLE_DOC)
+ .define(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, LONG,
LogConfig.DEFAULT_DELETE_RETENTION_MS, atLeast(0), MEDIUM,
CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_DOC)
+ .define(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP, LONG,
LogConfig.DEFAULT_MIN_COMPACTION_LAG_MS, atLeast(0), MEDIUM,
CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_DOC)
+ .define(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP, LONG,
LogConfig.DEFAULT_MAX_COMPACTION_LAG_MS, atLeast(1), MEDIUM,
CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_DOC)
.define(LogIndexSizeMaxBytesProp, INT,
LogConfig.DEFAULT_SEGMENT_INDEX_BYTES, atLeast(4), MEDIUM,
LogIndexSizeMaxBytesDoc)
.define(LogIndexIntervalBytesProp, INT,
LogConfig.DEFAULT_INDEX_INTERVAL_BYTES, atLeast(0), MEDIUM,
LogIndexIntervalBytesDoc)
.define(LogFlushIntervalMessagesProp, LONG,
LogConfig.DEFAULT_FLUSH_MESSAGES_INTERVAL, atLeast(1), HIGH,
LogFlushIntervalMessagesDoc)
@@ -1652,7 +1622,7 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
val logDirs =
CoreUtils.parseCsvList(Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp)))
def logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp)
def logFlushIntervalMessages =
getLong(KafkaConfig.LogFlushIntervalMessagesProp)
- val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp)
+ val logCleanerThreads = getInt(CleanerConfig.LOG_CLEANER_THREADS_PROP)
def numRecoveryThreadsPerDataDir =
getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
val logFlushSchedulerIntervalMs =
getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
val logFlushOffsetCheckpointIntervalMs =
getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
@@ -1662,16 +1632,16 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
val offsetsRetentionCheckIntervalMs =
getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp)
def logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)
- val logCleanerDedupeBufferSize =
getLong(KafkaConfig.LogCleanerDedupeBufferSizeProp)
- val logCleanerDedupeBufferLoadFactor =
getDouble(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp)
- val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp)
- val logCleanerIoMaxBytesPerSecond =
getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
- def logCleanerDeleteRetentionMs =
getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
- def logCleanerMinCompactionLagMs =
getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
- def logCleanerMaxCompactionLagMs =
getLong(KafkaConfig.LogCleanerMaxCompactionLagMsProp)
- val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
- def logCleanerMinCleanRatio =
getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
- val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
+ val logCleanerDedupeBufferSize =
getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP)
+ val logCleanerDedupeBufferLoadFactor =
getDouble(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP)
+ val logCleanerIoBufferSize =
getInt(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP)
+ val logCleanerIoMaxBytesPerSecond =
getDouble(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP)
+ def logCleanerDeleteRetentionMs =
getLong(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP)
+ def logCleanerMinCompactionLagMs =
getLong(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP)
+ def logCleanerMaxCompactionLagMs =
getLong(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP)
+ val logCleanerBackoffMs = getLong(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP)
+ def logCleanerMinCleanRatio =
getDouble(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP)
+ val logCleanerEnable = getBoolean(CleanerConfig.LOG_CLEANER_ENABLE_PROP)
def logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
def logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
def logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index fa11fa9b186..4c6d1f9ef4a 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -41,6 +41,7 @@ import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.MockFaultHandler;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -192,7 +193,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
props.put(RaftConfig.QUORUM_VOTERS_CONFIG,
uninitializedQuorumVotersString);
// reduce log cleaner offset map memory usage
- props.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(),
"2097152");
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
"2097152");
// Add associated broker node property overrides
if (brokerNode != null) {
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 54be3337625..e084454f5ff 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType,
TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica,
Uuid}
import
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.server.config.Defaults
-import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
@@ -447,7 +447,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
configs.get(brokerResource2).entries.size)
assertEquals(brokers(2).config.brokerId.toString,
configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
assertEquals(brokers(2).config.logCleanerThreads.toString,
-
configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
+
configs.get(brokerResource2).get(CleanerConfig.LOG_CLEANER_THREADS_PROP).value)
checkValidAlterConfigs(client, this, topicResource1, topicResource2)
}
@@ -2532,7 +2532,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
.all().get(15, TimeUnit.SECONDS)
val newLogCleanerDeleteRetention = new Properties
-
newLogCleanerDeleteRetention.put(KafkaConfig.LogCleanerDeleteRetentionMsProp,
"34")
+
newLogCleanerDeleteRetention.put(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
"34")
TestUtils.incrementalAlterConfigs(brokers, client,
newLogCleanerDeleteRetention, perBrokerConfig = true)
.all().get(15, TimeUnit.SECONDS)
@@ -2543,14 +2543,14 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
controllerServer.config.nodeId.toString)
controllerServer.controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT,
Collections.singletonMap(controllerNodeResource,
- Collections.singletonMap(KafkaConfig.LogCleanerDeleteRetentionMsProp,
+
Collections.singletonMap(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
new SimpleImmutableEntry(AlterConfigOp.OpType.SET, "34"))),
false).get()
ensureConsistentKRaftMetadata()
}
waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
- KafkaConfig.LogCleanerDeleteRetentionMsProp, "").toString.equals("34")),
- s"Timed out waiting for change to
${KafkaConfig.LogCleanerDeleteRetentionMsProp}",
+ CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
"").toString.equals("34")),
+ s"Timed out waiting for change to
${CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP}",
waitTimeMs = 60000L)
waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 4be668aa321..c984eae0272 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -254,7 +254,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
expectedProps.setProperty(KafkaConfig.LogRetentionTimeMillisProp,
"1680000000")
expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "168")
expectedProps.setProperty(KafkaConfig.LogRollTimeHoursProp, "168")
- expectedProps.setProperty(KafkaConfig.LogCleanerThreadsProp, "1")
+ expectedProps.setProperty(CleanerConfig.LOG_CLEANER_THREADS_PROP, "1")
val logRetentionMs = configEntry(configDesc,
KafkaConfig.LogRetentionTimeMillisProp)
verifyConfig(KafkaConfig.LogRetentionTimeMillisProp, logRetentionMs,
isSensitive = false, isReadOnly = false, expectedProps)
@@ -264,8 +264,8 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
val logRollHours = configEntry(configDesc,
KafkaConfig.LogRollTimeHoursProp)
verifyConfig(KafkaConfig.LogRollTimeHoursProp, logRollHours,
isSensitive = false, isReadOnly = true, expectedProps)
- val logCleanerThreads = configEntry(configDesc,
KafkaConfig.LogCleanerThreadsProp)
- verifyConfig(KafkaConfig.LogCleanerThreadsProp, logCleanerThreads,
+ val logCleanerThreads = configEntry(configDesc,
CleanerConfig.LOG_CLEANER_THREADS_PROP)
+ verifyConfig(CleanerConfig.LOG_CLEANER_THREADS_PROP, logCleanerThreads,
isSensitive = false, isReadOnly = false, expectedProps)
def synonymsList(configEntry: ConfigEntry): List[(String, ConfigSource)] =
@@ -278,7 +278,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
(KafkaConfig.LogRetentionTimeHoursProp, ConfigSource.DEFAULT_CONFIG)),
synonymsList(logRetentionHours))
assertEquals(List((KafkaConfig.LogRollTimeHoursProp,
ConfigSource.DEFAULT_CONFIG)), synonymsList(logRollHours))
- assertEquals(List((KafkaConfig.LogCleanerThreadsProp,
ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads))
+ assertEquals(List((CleanerConfig.LOG_CLEANER_THREADS_PROP,
ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -536,19 +536,19 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
val props = new Properties
- props.put(KafkaConfig.LogCleanerThreadsProp, "2")
- props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "20000000")
- props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, "0.8")
- props.put(KafkaConfig.LogCleanerIoBufferSizeProp, "300000")
+ props.put(CleanerConfig.LOG_CLEANER_THREADS_PROP, "2")
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "20000000")
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, "0.8")
+ props.put(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, "300000")
props.put(KafkaConfig.MessageMaxBytesProp, "40000")
- props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "50000000")
- props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000")
+ props.put(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
"50000000")
+ props.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, "6000")
// Verify cleaner config was updated. Wait for one of the configs to be
updated and verify
// that all other others were updated at the same time since they are
reconfigured together
var newCleanerConfig: CleanerConfig = null
TestUtils.waitUntilTrue(() => {
- reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.LogCleanerThreadsProp, "2"))
+ reconfigureServers(props, perBrokerConfig = false,
(CleanerConfig.LOG_CLEANER_THREADS_PROP, "2"))
newCleanerConfig = servers.head.logManager.cleaner.currentConfig
newCleanerConfig.numThreads == 2
}, "Log cleaner not reconfigured", 60000)
@@ -566,8 +566,8 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
def cleanerThreads =
Thread.getAllStackTraces.keySet.asScala.filter(_.getName.startsWith("kafka-log-cleaner-thread-"))
cleanerThreads.take(2).foreach(_.interrupt())
TestUtils.waitUntilTrue(() => cleanerThreads.size == (2 * numServers) - 2,
"Threads did not exit")
- props.put(KafkaConfig.LogCleanerBackoffMsProp, "8000")
- reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.LogCleanerBackoffMsProp, "8000"))
+ props.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, "8000")
+ reconfigureServers(props, perBrokerConfig = false,
(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, "8000"))
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
// Verify that produce/consume worked throughout this test without any
retries in producer
@@ -635,10 +635,10 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
props.put(KafkaConfig.LogRetentionTimeMillisProp,
TimeUnit.DAYS.toMillis(1).toString)
props.put(KafkaConfig.MessageMaxBytesProp, "100000")
props.put(KafkaConfig.LogIndexIntervalBytesProp, "10000")
- props.put(KafkaConfig.LogCleanerDeleteRetentionMsProp,
TimeUnit.DAYS.toMillis(1).toString)
- props.put(KafkaConfig.LogCleanerMinCompactionLagMsProp, "60000")
+ props.put(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
TimeUnit.DAYS.toMillis(1).toString)
+ props.put(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP, "60000")
props.put(KafkaConfig.LogDeleteDelayMsProp, "60000")
- props.put(KafkaConfig.LogCleanerMinCleanRatioProp, "0.3")
+ props.put(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP, "0.3")
props.put(KafkaConfig.LogCleanupPolicyProp, "delete")
props.put(KafkaConfig.UncleanLeaderElectionEnableProp, "false")
props.put(KafkaConfig.MinInSyncReplicasProp, "2")
diff --git
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 8729045db7e..49e518ac2af 100755
---
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -251,13 +251,13 @@ class LogCleanerParameterizedIntegrationTest extends
AbstractLogCleanerIntegrati
def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig):
KafkaConfig = {
val props = TestUtils.createBrokerConfig(0, "localhost:2181")
- props.put(KafkaConfig.LogCleanerThreadsProp,
cleanerConfig.numThreads.toString)
- props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp,
cleanerConfig.dedupeBufferSize.toString)
- props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp,
cleanerConfig.dedupeBufferLoadFactor.toString)
- props.put(KafkaConfig.LogCleanerIoBufferSizeProp,
cleanerConfig.ioBufferSize.toString)
+ props.put(CleanerConfig.LOG_CLEANER_THREADS_PROP,
cleanerConfig.numThreads.toString)
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
cleanerConfig.dedupeBufferSize.toString)
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP,
cleanerConfig.dedupeBufferLoadFactor.toString)
+ props.put(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP,
cleanerConfig.ioBufferSize.toString)
props.put(KafkaConfig.MessageMaxBytesProp,
cleanerConfig.maxMessageSize.toString)
- props.put(KafkaConfig.LogCleanerBackoffMsProp,
cleanerConfig.backoffMs.toString)
- props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp,
cleanerConfig.maxIoBytesPerSecond.toString)
+ props.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP,
cleanerConfig.backoffMs.toString)
+ props.put(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
cleanerConfig.maxIoBytesPerSecond.toString)
KafkaConfig.fromProps(props)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 161493457e1..b3a72592196 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1959,7 +1959,7 @@ class LogCleanerTest extends Logging {
@Test
def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
- oldKafkaProps.setProperty(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp,
"10000000")
+
oldKafkaProps.setProperty(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
"10000000")
val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new
KafkaConfig(oldKafkaProps)),
logDirs = Array(TestUtils.tempDir()),
@@ -1973,14 +1973,14 @@ class LogCleanerTest extends Logging {
}
try {
- assertEquals(10000000, logCleaner.throttler.desiredRatePerSec,
s"Throttler.desiredRatePerSec should be initialized from initial
`${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+ assertEquals(10000000, logCleaner.throttler.desiredRatePerSec,
s"Throttler.desiredRatePerSec should be initialized from initial
`${CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP}` config.")
val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
- newKafkaProps.setProperty(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp,
"20000000")
+
newKafkaProps.setProperty(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
"20000000")
logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new
KafkaConfig(newKafkaProps))
- assertEquals(20000000, logCleaner.throttler.desiredRatePerSec,
s"Throttler.desiredRatePerSec should be updated with new
`${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+ assertEquals(20000000, logCleaner.throttler.desiredRatePerSec,
s"Throttler.desiredRatePerSec should be updated with new
`${CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP}` config.")
} finally {
logCleaner.shutdown()
}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 00777e9677b..135de6e5144 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -55,6 +55,7 @@ import
org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext,
AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, Features,
MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.util.FutureUtils
+import org.apache.kafka.storage.internals.log.CleanerConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -318,19 +319,19 @@ class ControllerApisTest {
setResourceName("1").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new OldAlterableConfigCollection(util.Arrays.asList(new
OldAlterableConfig().
- setName(KafkaConfig.LogCleanerBackoffMsProp).
+ setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
setValue("100000")).iterator())),
new OldAlterConfigsResource().
setResourceName("2").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new OldAlterableConfigCollection(util.Arrays.asList(new
OldAlterableConfig().
- setName(KafkaConfig.LogCleanerBackoffMsProp).
+ setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
setValue("100000")).iterator())),
new OldAlterConfigsResource().
setResourceName("2").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new OldAlterableConfigCollection(util.Arrays.asList(new
OldAlterableConfig().
- setName(KafkaConfig.LogCleanerBackoffMsProp).
+ setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
setValue("100000")).iterator())),
new OldAlterConfigsResource().
setResourceName("baz").
@@ -472,7 +473,7 @@ class ControllerApisTest {
setResourceName("1").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new
AlterableConfig().
- setName(KafkaConfig.LogCleanerBackoffMsProp).
+ setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
setValue("100000").
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
new AlterConfigsResource().
@@ -536,14 +537,14 @@ class ControllerApisTest {
setResourceName("3").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new
AlterableConfig().
- setName(KafkaConfig.LogCleanerBackoffMsProp).
+ setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
setValue("100000").
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
new AlterConfigsResource().
setResourceName("3").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new
AlterableConfig().
- setName(KafkaConfig.LogCleanerBackoffMsProp).
+ setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
setValue("100000").
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
new AlterConfigsResource().
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index be065f5b8ca..4e1c8eed276 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.server.config.Defaults
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler
-import org.apache.kafka.storage.internals.log.{LogConfig,
ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
ProducerStateManagerConfig}
import org.apache.kafka.test.MockMetricsReporter
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -215,7 +215,7 @@ class DynamicBrokerConfigTest {
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps,
nonDynamicProps)
// Test update of configs with invalid type
- val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "invalid")
+ val invalidProps = Map(CleanerConfig.LOG_CLEANER_THREADS_PROP -> "invalid")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps,
invalidProps)
val excludedTopicConfig = Map(KafkaConfig.LogMessageFormatVersionProp ->
"0.10.2")
@@ -225,21 +225,21 @@ class DynamicBrokerConfigTest {
@Test
def testConfigUpdateWithReconfigurableValidationFailure(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect,
port = 8181)
- origProps.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "100000000")
+ origProps.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
"100000000")
val config = KafkaConfig(origProps)
config.dynamicConfig.initialize(None, None)
val validProps = Map.empty[String, String]
- val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "20")
+ val invalidProps = Map(CleanerConfig.LOG_CLEANER_THREADS_PROP -> "20")
def validateLogCleanerConfig(configs: util.Map[String, _]): Unit = {
- val cleanerThreads =
configs.get(KafkaConfig.LogCleanerThreadsProp).toString.toInt
+ val cleanerThreads =
configs.get(CleanerConfig.LOG_CLEANER_THREADS_PROP).toString.toInt
if (cleanerThreads <=0 || cleanerThreads >= 5)
throw new ConfigException(s"Invalid cleaner threads $cleanerThreads")
}
val reconfigurable = new Reconfigurable {
override def configure(configs: util.Map[String, _]): Unit = {}
- override def reconfigurableConfigs(): util.Set[String] =
Set(KafkaConfig.LogCleanerThreadsProp).asJava
+ override def reconfigurableConfigs(): util.Set[String] =
Set(CleanerConfig.LOG_CLEANER_THREADS_PROP).asJava
override def validateReconfiguration(configs: util.Map[String, _]): Unit
= validateLogCleanerConfig(configs)
override def reconfigure(configs: util.Map[String, _]): Unit = {}
}
@@ -248,7 +248,7 @@ class DynamicBrokerConfigTest {
config.dynamicConfig.removeReconfigurable(reconfigurable)
val brokerReconfigurable = new BrokerReconfigurable {
- override def reconfigurableConfigs: collection.Set[String] =
Set(KafkaConfig.LogCleanerThreadsProp)
+ override def reconfigurableConfigs: collection.Set[String] =
Set(CleanerConfig.LOG_CLEANER_THREADS_PROP)
override def validateReconfiguration(newConfig: KafkaConfig): Unit =
validateLogCleanerConfig(newConfig.originals)
override def reconfigure(oldConfig: KafkaConfig, newConfig:
KafkaConfig): Unit = {}
}
@@ -260,8 +260,8 @@ class DynamicBrokerConfigTest {
def testReconfigurableValidation(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect,
port = 8181)
val config = KafkaConfig(origProps)
- val invalidReconfigurableProps = Set(KafkaConfig.LogCleanerThreadsProp,
KafkaConfig.BrokerIdProp, "some.prop")
- val validReconfigurableProps = Set(KafkaConfig.LogCleanerThreadsProp,
KafkaConfig.LogCleanerDedupeBufferSizeProp, "some.prop")
+ val invalidReconfigurableProps =
Set(CleanerConfig.LOG_CLEANER_THREADS_PROP, KafkaConfig.BrokerIdProp,
"some.prop")
+ val validReconfigurableProps = Set(CleanerConfig.LOG_CLEANER_THREADS_PROP,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "some.prop")
def createReconfigurable(configs: Set[String]) = new Reconfigurable {
override def configure(configs: util.Map[String, _]): Unit = {}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e9426245b14..a5d4d961fe1 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
-import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.junit.jupiter.api.function.Executable
import scala.annotation.nowarn
@@ -845,14 +845,14 @@ class KafkaConfigTest {
case KafkaConfig.LogRetentionBytesProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.LogCleanupIntervalMsProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.LogCleanupPolicyProp =>
assertPropertyInvalid(baseProperties, name, "unknown_policy", "0")
- case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KafkaConfig.LogCleanerDedupeBufferSizeProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "1024")
- case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KafkaConfig.LogCleanerEnableProp =>
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
- case KafkaConfig.LogCleanerDeleteRetentionMsProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KafkaConfig.LogCleanerMinCompactionLagMsProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KafkaConfig.LogCleanerMaxCompactionLagMsProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KafkaConfig.LogCleanerMinCleanRatioProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "1024")
+ case CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case CleanerConfig.LOG_CLEANER_ENABLE_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+ case CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.LogIndexSizeMaxBytesProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "3")
case KafkaConfig.LogFlushIntervalMessagesProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.LogFlushSchedulerIntervalMsProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7b1ce80cbed..17ee3139a51 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -373,7 +373,7 @@ object TestUtils extends Logging {
props.put(KafkaConfig.DeleteTopicEnableProp, enableDeleteTopic.toString)
props.put(KafkaConfig.LogDeleteDelayMsProp, "1000")
props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "100")
- props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "2097152")
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152")
props.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
if (!props.containsKey(KafkaConfig.OffsetsTopicPartitionsProp))
props.put(KafkaConfig.OffsetsTopicPartitionsProp, "5")
diff --git
a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
index 320de2db6b9..02cdc1cc875 100644
---
a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
+++
b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils;
public final class ServerTopicConfigSynonyms {
private static final String LOG_PREFIX = "log.";
- private static final String LOG_CLEANER_PREFIX = LOG_PREFIX + "cleaner.";
+ public static final String LOG_CLEANER_PREFIX = LOG_PREFIX + "cleaner.";
/**
* Maps topic configurations to their equivalent broker configurations.
diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java
b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
index e55f641790f..0c7ca123b87 100644
--- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java
+++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
@@ -106,13 +106,6 @@ public class Defaults {
public static final int NUM_PARTITIONS = 1;
public static final String LOG_DIR = "/tmp/kafka-logs";
public static final long LOG_CLEANUP_INTERVAL_MS = 5 * 60 * 1000L;
- public static final int LOG_CLEANER_THREADS = 1;
- public static final double LOG_CLEANER_IO_MAX_BYTES_PER_SECOND =
Double.MAX_VALUE;
- public static final long LOG_CLEANER_DEDUPE_BUFFER_SIZE = 128 * 1024 *
1024L;
- public static final int LOG_CLEANER_IO_BUFFER_SIZE = 512 * 1024;
- public static final double LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR = 0.9d;
- public static final int LOG_CLEANER_BACKOFF_MS = 15 * 1000;
- public static final boolean LOG_CLEANER_ENABLE = true;
public static final int LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS = 60000;
public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS =
60000;
public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1;
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
index 9e38d0c02e9..8168197fe08 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
@@ -16,11 +16,53 @@
*/
package org.apache.kafka.storage.internals.log;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+
/**
* Configuration parameters for the log cleaner.
*/
public class CleanerConfig {
public static final String HASH_ALGORITHM = "MD5";
+ public static final int LOG_CLEANER_THREADS = 1;
+ public static final double LOG_CLEANER_IO_MAX_BYTES_PER_SECOND =
Double.MAX_VALUE;
+ public static final long LOG_CLEANER_DEDUPE_BUFFER_SIZE = 128 * 1024 *
1024L;
+ public static final int LOG_CLEANER_IO_BUFFER_SIZE = 512 * 1024;
+ public static final double LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR = 0.9d;
+ public static final int LOG_CLEANER_BACKOFF_MS = 15 * 1000;
+ public static final boolean LOG_CLEANER_ENABLE = true;
+
+ public static final String LOG_CLEANER_THREADS_PROP =
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "threads";
+ public static final String LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP =
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "io.max.bytes.per.second";
+ public static final String LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP =
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "dedupe.buffer.size";
+ public static final String LOG_CLEANER_IO_BUFFER_SIZE_PROP =
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "io.buffer.size";
+ public static final String LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP =
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "io.buffer.load.factor";
+ public static final String LOG_CLEANER_BACKOFF_MS_PROP =
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "backoff.ms";
+ public static final String LOG_CLEANER_MIN_CLEAN_RATIO_PROP =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG);
+ public static final String LOG_CLEANER_ENABLE_PROP =
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "enable";
+ public static final String LOG_CLEANER_DELETE_RETENTION_MS_PROP =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.DELETE_RETENTION_MS_CONFIG);
+ public static final String LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG);
+ public static final String LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP =
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
+
+ public static final String LOG_CLEANER_MIN_CLEAN_RATIO_DOC = "The minimum
ratio of dirty log to total log for a log to eligible for cleaning. " +
+ "If the " + LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP + " or the " +
LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP +
+ " configurations are also specified, then the log compactor
considers the log eligible for compaction " +
+ "as soon as either: (i) the dirty ratio threshold has been met and
the log has had dirty (uncompacted) " +
+ "records for at least the " +
LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP + " duration, or (ii) if the log has had
" +
+ "dirty (uncompacted) records for at most the " +
LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP + " period.";
+ public static final String LOG_CLEANER_THREADS_DOC = "The number of
background threads to use for log cleaning";
+ public static final String LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_DOC = "The
log cleaner will be throttled so that the sum of its read and write i/o will be
less than this value on average";
+ public static final String LOG_CLEANER_DEDUPE_BUFFER_SIZE_DOC = "The total
memory used for log deduplication across all cleaner threads";
+ public static final String LOG_CLEANER_IO_BUFFER_SIZE_DOC = "The total
memory used for log cleaner I/O buffers across all cleaner threads";
+ public static final String LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_DOC =
"Log cleaner dedupe buffer load factor. The percentage full the dedupe buffer
can become. A higher value " +
+ "will allow more log to be cleaned at once but will lead to more
hash collisions";
+ public static final String LOG_CLEANER_BACKOFF_MS_DOC = "The amount of
time to sleep when there are no logs to clean";
+ public static final String LOG_CLEANER_ENABLE_DOC = "Enable the log
cleaner process to run on the server. Should be enabled if using any topics
with a cleanup.policy=compact including the internal offsets topic. If disabled
those topics will not be compacted and continually grow in size.";
+ public static final String LOG_CLEANER_DELETE_RETENTION_MS_DOC = "The
amount of time to retain tombstone message markers for log compacted topics.
This setting also gives a bound " +
+ "on the time in which a consumer must complete a read if they
begin from offset 0 to ensure that they get a valid snapshot of the final stage
(otherwise " +
+ "tombstones messages may be collected before a consumer completes
their scan).";
+ public static final String LOG_CLEANER_MIN_COMPACTION_LAG_MS_DOC = "The
minimum time a message will remain uncompacted in the log. Only applicable for
logs that are being compacted.";
+ public static final String LOG_CLEANER_MAX_COMPACTION_LAG_MS_DOC = "The
maximum time a message will remain ineligible for compaction in the log. Only
applicable for logs that are being compacted.";
public final int numThreads;
public final long dedupeBufferSize;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 3f3e9a243bd..4232e1d74c9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.server.config.ConfigType;
import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
@@ -111,7 +112,7 @@ public class EmbeddedKafkaCluster {
brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(),
"PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);
putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
- putIfAbsent(brokerConfig,
KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
+ putIfAbsent(brokerConfig,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
putIfAbsent(brokerConfig, KafkaConfig.GroupMinSessionTimeoutMsProp(),
0);
putIfAbsent(brokerConfig,
KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
putIfAbsent(brokerConfig,
KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) 1);