This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eea369af947 KAFKA-14588 Log cleaner configuration move to 
CleanerConfig (#15387)
eea369af947 is described below

commit eea369af947dcff567f849183ba2217ac5e9a2ba
Author: Nikolay <[email protected]>
AuthorDate: Tue Mar 5 13:11:56 2024 +0300

    KAFKA-14588 Log cleaner configuration move to CleanerConfig (#15387)
    
    In order to move ConfigCommand to tools we must move all it's dependencies 
which includes KafkaConfig and other core classes to java. This PR moves log 
cleaner configuration to CleanerConfig class of storage module.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 build.gradle                                       |  2 +
 checkstyle/import-control-core.xml                 |  1 +
 checkstyle/import-control.xml                      |  2 +
 .../util/clusters/EmbeddedKafkaCluster.java        |  3 +-
 core/src/main/scala/kafka/log/LogCleaner.scala     | 12 ++--
 core/src/main/scala/kafka/server/KafkaConfig.scala | 76 +++++++---------------
 .../java/kafka/testkit/KafkaClusterTestKit.java    |  3 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 12 ++--
 .../server/DynamicBrokerReconfigurationTest.scala  | 32 ++++-----
 .../LogCleanerParameterizedIntegrationTest.scala   | 12 ++--
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  8 +--
 .../unit/kafka/server/ControllerApisTest.scala     | 13 ++--
 .../kafka/server/DynamicBrokerConfigTest.scala     | 18 ++---
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 18 ++---
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  2 +-
 .../server/config/ServerTopicConfigSynonyms.java   |  2 +-
 .../org/apache/kafka/server/config/Defaults.java   |  7 --
 .../kafka/storage/internals/log/CleanerConfig.java | 42 ++++++++++++
 .../integration/utils/EmbeddedKafkaCluster.java    |  3 +-
 19 files changed, 141 insertions(+), 127 deletions(-)

diff --git a/build.gradle b/build.gradle
index f2ba22e86b3..7dc8f50342f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2177,6 +2177,7 @@ project(':streams') {
     testImplementation project(':core')
     testImplementation project(':tools')
     testImplementation project(':core').sourceSets.test.output
+    testImplementation project(':storage')
     testImplementation project(':server-common')
     testImplementation project(':server-common').sourceSets.test.output
     testImplementation libs.log4j
@@ -2975,6 +2976,7 @@ project(':connect:runtime') {
     testImplementation project(':metadata')
     testImplementation project(':core').sourceSets.test.output
     testImplementation project(':server-common')
+    testImplementation project(':storage')
     testImplementation project(':connect:test-plugins')
 
     testImplementation libs.easymock
diff --git a/checkstyle/import-control-core.xml 
b/checkstyle/import-control-core.xml
index 3f9a21fffc6..782d2fe4617 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -56,6 +56,7 @@
     <allow pkg="org.apache.kafka.metalog" />
     <allow pkg="org.apache.kafka.server.common" />
     <allow pkg="org.apache.kafka.server.fault" />
+    <allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
   </subpackage>
 
   <subpackage name="tools">
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index caf1fe5ebe1..a52b3d94e32 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -410,6 +410,7 @@
       <allow pkg="com.fasterxml.jackson" />
       <allow pkg="org.apache.kafka.tools" />
       <allow pkg="org.apache.kafka.server.config" />
+      <allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
     </subpackage>
 
     <subpackage name="test">
@@ -601,6 +602,7 @@
         <allow pkg="com.fasterxml.jackson.core.type" />
         <allow pkg="org.apache.kafka.metadata" />
         <allow pkg="org.eclipse.jetty.client"/>
+        <allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
       </subpackage>
     </subpackage>
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index e3694cae291..c15aa27ae59 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -55,6 +55,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -160,7 +161,7 @@ public class EmbeddedKafkaCluster {
         putIfAbsent(brokerConfig, 
KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
         putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), 
false);
         // reduce the size of the log cleaner map to reduce test memory usage
-        putIfAbsent(brokerConfig, 
KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
+        putIfAbsent(brokerConfig, 
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
 
         Object listenerConfig = 
brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
         if (listenerConfig == null)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 8098ea237e0..b653f40b287 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -499,13 +499,13 @@ class LogCleaner(initialConfig: CleanerConfig,
 
 object LogCleaner {
   val ReconfigurableConfigs: Set[String] = Set(
-    KafkaConfig.LogCleanerThreadsProp,
-    KafkaConfig.LogCleanerDedupeBufferSizeProp,
-    KafkaConfig.LogCleanerDedupeBufferLoadFactorProp,
-    KafkaConfig.LogCleanerIoBufferSizeProp,
+    CleanerConfig.LOG_CLEANER_THREADS_PROP,
+    CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
+    CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP,
+    CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP,
     KafkaConfig.MessageMaxBytesProp,
-    KafkaConfig.LogCleanerIoMaxBytesPerSecondProp,
-    KafkaConfig.LogCleanerBackoffMsProp
+    CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
+    CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP
   )
 
   def cleanerConfig(config: KafkaConfig): CleanerConfig = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c6f51a000e2..eaddb047da8 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -49,7 +49,7 @@ import org.apache.kafka.server.config.{Defaults, 
ServerTopicConfigSynonyms}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.util.Csv
-import org.apache.kafka.storage.internals.log.{LogConfig, 
ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
ProducerStateManagerConfig}
 import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
 import org.apache.zookeeper.client.ZKClientConfig
 
@@ -211,17 +211,6 @@ object KafkaConfig {
   val LogRetentionBytesProp = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_BYTES_CONFIG)
   val LogCleanupIntervalMsProp = LogConfigPrefix + 
"retention.check.interval.ms"
   val LogCleanupPolicyProp = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.CLEANUP_POLICY_CONFIG)
-  val LogCleanerThreadsProp = LogConfigPrefix + "cleaner.threads"
-  val LogCleanerIoMaxBytesPerSecondProp = LogConfigPrefix + 
"cleaner.io.max.bytes.per.second"
-  val LogCleanerDedupeBufferSizeProp = LogConfigPrefix + 
"cleaner.dedupe.buffer.size"
-  val LogCleanerIoBufferSizeProp = LogConfigPrefix + "cleaner.io.buffer.size"
-  val LogCleanerDedupeBufferLoadFactorProp = LogConfigPrefix + 
"cleaner.io.buffer.load.factor"
-  val LogCleanerBackoffMsProp = LogConfigPrefix + "cleaner.backoff.ms"
-  val LogCleanerMinCleanRatioProp = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG)
-  val LogCleanerEnableProp = LogConfigPrefix + "cleaner.enable"
-  val LogCleanerDeleteRetentionMsProp = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.DELETE_RETENTION_MS_CONFIG)
-  val LogCleanerMinCompactionLagMsProp = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG)
-  val LogCleanerMaxCompactionLagMsProp = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG)
   val LogIndexSizeMaxBytesProp = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)
   val LogIndexIntervalBytesProp = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG)
   val LogFlushIntervalMessagesProp = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG)
@@ -652,25 +641,6 @@ object KafkaConfig {
   val LogRetentionBytesDoc = "The maximum size of the log before deleting it"
   val LogCleanupIntervalMsDoc = "The frequency in milliseconds that the log 
cleaner checks whether any log is eligible for deletion"
   val LogCleanupPolicyDoc = "The default cleanup policy for segments beyond 
the retention window. A comma separated list of valid policies. Valid policies 
are: \"delete\" and \"compact\""
-  val LogCleanerThreadsDoc = "The number of background threads to use for log 
cleaning"
-  val LogCleanerIoMaxBytesPerSecondDoc = "The log cleaner will be throttled so 
that the sum of its read and write i/o will be less than this value on average"
-  val LogCleanerDedupeBufferSizeDoc = "The total memory used for log 
deduplication across all cleaner threads"
-  val LogCleanerIoBufferSizeDoc = "The total memory used for log cleaner I/O 
buffers across all cleaner threads"
-  val LogCleanerDedupeBufferLoadFactorDoc = "Log cleaner dedupe buffer load 
factor. The percentage full the dedupe buffer can become. A higher value " +
-  "will allow more log to be cleaned at once but will lead to more hash 
collisions"
-  val LogCleanerBackoffMsDoc = "The amount of time to sleep when there are no 
logs to clean"
-  val LogCleanerMinCleanRatioDoc = "The minimum ratio of dirty log to total 
log for a log to eligible for cleaning. " +
-    "If the " + LogCleanerMaxCompactionLagMsProp + " or the " + 
LogCleanerMinCompactionLagMsProp +
-    " configurations are also specified, then the log compactor considers the 
log eligible for compaction " +
-    "as soon as either: (i) the dirty ratio threshold has been met and the log 
has had dirty (uncompacted) " +
-    "records for at least the " + LogCleanerMinCompactionLagMsProp + " 
duration, or (ii) if the log has had " +
-    "dirty (uncompacted) records for at most the " + 
LogCleanerMaxCompactionLagMsProp + " period."
-  val LogCleanerEnableDoc = "Enable the log cleaner process to run on the 
server. Should be enabled if using any topics with a cleanup.policy=compact 
including the internal offsets topic. If disabled those topics will not be 
compacted and continually grow in size."
-  val LogCleanerDeleteRetentionMsDoc = "The amount of time to retain tombstone 
message markers for log compacted topics. This setting also gives a bound " +
-    "on the time in which a consumer must complete a read if they begin from 
offset 0 to ensure that they get a valid snapshot of the final stage (otherwise 
 " +
-    "tombstones messages may be collected before a consumer completes their 
scan)."
-  val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will 
remain uncompacted in the log. Only applicable for logs that are being 
compacted."
-  val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will 
remain ineligible for compaction in the log. Only applicable for logs that are 
being compacted."
   val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
   val LogIndexIntervalBytesDoc = "The interval with which we add an entry to 
the offset index."
   val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a 
log partition before messages are flushed to disk."
@@ -1075,17 +1045,17 @@ object KafkaConfig {
       .define(LogRetentionBytesProp, LONG, LogConfig.DEFAULT_RETENTION_BYTES, 
HIGH, LogRetentionBytesDoc)
       .define(LogCleanupIntervalMsProp, LONG, 
Defaults.LOG_CLEANUP_INTERVAL_MS, atLeast(1), MEDIUM, LogCleanupIntervalMsDoc)
       .define(LogCleanupPolicyProp, LIST, LogConfig.DEFAULT_CLEANUP_POLICY, 
ValidList.in(TopicConfig.CLEANUP_POLICY_COMPACT, 
TopicConfig.CLEANUP_POLICY_DELETE), MEDIUM, LogCleanupPolicyDoc)
-      .define(LogCleanerThreadsProp, INT, Defaults.LOG_CLEANER_THREADS, 
atLeast(0), MEDIUM, LogCleanerThreadsDoc)
-      .define(LogCleanerIoMaxBytesPerSecondProp, DOUBLE, 
Defaults.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND, MEDIUM, 
LogCleanerIoMaxBytesPerSecondDoc)
-      .define(LogCleanerDedupeBufferSizeProp, LONG, 
Defaults.LOG_CLEANER_DEDUPE_BUFFER_SIZE, MEDIUM, LogCleanerDedupeBufferSizeDoc)
-      .define(LogCleanerIoBufferSizeProp, INT, 
Defaults.LOG_CLEANER_IO_BUFFER_SIZE, atLeast(0), MEDIUM, 
LogCleanerIoBufferSizeDoc)
-      .define(LogCleanerDedupeBufferLoadFactorProp, DOUBLE, 
Defaults.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR, MEDIUM, 
LogCleanerDedupeBufferLoadFactorDoc)
-      .define(LogCleanerBackoffMsProp, LONG, Defaults.LOG_CLEANER_BACKOFF_MS, 
atLeast(0), MEDIUM, LogCleanerBackoffMsDoc)
-      .define(LogCleanerMinCleanRatioProp, DOUBLE, 
LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, between(0, 1), MEDIUM, 
LogCleanerMinCleanRatioDoc)
-      .define(LogCleanerEnableProp, BOOLEAN, Defaults.LOG_CLEANER_ENABLE, 
MEDIUM, LogCleanerEnableDoc)
-      .define(LogCleanerDeleteRetentionMsProp, LONG, 
LogConfig.DEFAULT_DELETE_RETENTION_MS, atLeast(0), MEDIUM, 
LogCleanerDeleteRetentionMsDoc)
-      .define(LogCleanerMinCompactionLagMsProp, LONG, 
LogConfig.DEFAULT_MIN_COMPACTION_LAG_MS, atLeast(0), MEDIUM, 
LogCleanerMinCompactionLagMsDoc)
-      .define(LogCleanerMaxCompactionLagMsProp, LONG, 
LogConfig.DEFAULT_MAX_COMPACTION_LAG_MS, atLeast(1), MEDIUM, 
LogCleanerMaxCompactionLagMsDoc)
+      .define(CleanerConfig.LOG_CLEANER_THREADS_PROP, INT, 
CleanerConfig.LOG_CLEANER_THREADS, atLeast(0), MEDIUM, 
CleanerConfig.LOG_CLEANER_THREADS_DOC)
+      .define(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, DOUBLE, 
CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND, MEDIUM, 
CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_DOC)
+      .define(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, LONG, 
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE, MEDIUM, 
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_DOC)
+      .define(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, INT, 
CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE, atLeast(0), MEDIUM, 
CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_DOC)
+      .define(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, 
DOUBLE, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR, MEDIUM, 
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_DOC)
+      .define(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, LONG, 
CleanerConfig.LOG_CLEANER_BACKOFF_MS, atLeast(0), MEDIUM, 
CleanerConfig.LOG_CLEANER_BACKOFF_MS_DOC)
+      .define(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP, DOUBLE, 
LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, between(0, 1), MEDIUM, 
CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_DOC)
+      .define(CleanerConfig.LOG_CLEANER_ENABLE_PROP, BOOLEAN, 
CleanerConfig.LOG_CLEANER_ENABLE, MEDIUM, CleanerConfig.LOG_CLEANER_ENABLE_DOC)
+      .define(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, LONG, 
LogConfig.DEFAULT_DELETE_RETENTION_MS, atLeast(0), MEDIUM, 
CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_DOC)
+      .define(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP, LONG, 
LogConfig.DEFAULT_MIN_COMPACTION_LAG_MS, atLeast(0), MEDIUM, 
CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_DOC)
+      .define(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP, LONG, 
LogConfig.DEFAULT_MAX_COMPACTION_LAG_MS, atLeast(1), MEDIUM, 
CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_DOC)
       .define(LogIndexSizeMaxBytesProp, INT, 
LogConfig.DEFAULT_SEGMENT_INDEX_BYTES, atLeast(4), MEDIUM, 
LogIndexSizeMaxBytesDoc)
       .define(LogIndexIntervalBytesProp, INT, 
LogConfig.DEFAULT_INDEX_INTERVAL_BYTES, atLeast(0), MEDIUM, 
LogIndexIntervalBytesDoc)
       .define(LogFlushIntervalMessagesProp, LONG, 
LogConfig.DEFAULT_FLUSH_MESSAGES_INTERVAL, atLeast(1), HIGH, 
LogFlushIntervalMessagesDoc)
@@ -1652,7 +1622,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val logDirs = 
CoreUtils.parseCsvList(Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp)))
   def logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp)
   def logFlushIntervalMessages = 
getLong(KafkaConfig.LogFlushIntervalMessagesProp)
-  val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp)
+  val logCleanerThreads = getInt(CleanerConfig.LOG_CLEANER_THREADS_PROP)
   def numRecoveryThreadsPerDataDir = 
getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
   val logFlushSchedulerIntervalMs = 
getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
   val logFlushOffsetCheckpointIntervalMs = 
getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
@@ -1662,16 +1632,16 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
   val offsetsRetentionCheckIntervalMs = 
getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp)
   def logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)
-  val logCleanerDedupeBufferSize = 
getLong(KafkaConfig.LogCleanerDedupeBufferSizeProp)
-  val logCleanerDedupeBufferLoadFactor = 
getDouble(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp)
-  val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp)
-  val logCleanerIoMaxBytesPerSecond = 
getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
-  def logCleanerDeleteRetentionMs = 
getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
-  def logCleanerMinCompactionLagMs = 
getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
-  def logCleanerMaxCompactionLagMs = 
getLong(KafkaConfig.LogCleanerMaxCompactionLagMsProp)
-  val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
-  def logCleanerMinCleanRatio = 
getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
-  val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
+  val logCleanerDedupeBufferSize = 
getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP)
+  val logCleanerDedupeBufferLoadFactor = 
getDouble(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP)
+  val logCleanerIoBufferSize = 
getInt(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP)
+  val logCleanerIoMaxBytesPerSecond = 
getDouble(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP)
+  def logCleanerDeleteRetentionMs = 
getLong(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP)
+  def logCleanerMinCompactionLagMs = 
getLong(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP)
+  def logCleanerMaxCompactionLagMs = 
getLong(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP)
+  val logCleanerBackoffMs = getLong(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP)
+  def logCleanerMinCleanRatio = 
getDouble(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP)
+  val logCleanerEnable = getBoolean(CleanerConfig.LOG_CLEANER_ENABLE_PROP)
   def logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
   def logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
   def logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java 
b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index fa11fa9b186..4c6d1f9ef4a 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -41,6 +41,7 @@ import org.apache.kafka.raft.RaftConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.fault.FaultHandler;
 import org.apache.kafka.server.fault.MockFaultHandler;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -192,7 +193,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
             props.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
uninitializedQuorumVotersString);
 
             // reduce log cleaner offset map memory usage
-            props.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 
"2097152");
+            props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 
"2097152");
 
             // Add associated broker node property overrides
             if (brokerNode != null) {
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 54be3337625..e084454f5ff 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{ConsumerGroupState, ElectionType, 
TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, 
Uuid}
 import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
 import org.apache.kafka.server.config.Defaults
-import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
@@ -447,7 +447,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       configs.get(brokerResource2).entries.size)
     assertEquals(brokers(2).config.brokerId.toString, 
configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
     assertEquals(brokers(2).config.logCleanerThreads.toString,
-      
configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
+      
configs.get(brokerResource2).get(CleanerConfig.LOG_CLEANER_THREADS_PROP).value)
 
     checkValidAlterConfigs(client, this, topicResource1, topicResource2)
   }
@@ -2532,7 +2532,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       .all().get(15, TimeUnit.SECONDS)
 
     val newLogCleanerDeleteRetention = new Properties
-    
newLogCleanerDeleteRetention.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, 
"34")
+    
newLogCleanerDeleteRetention.put(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
 "34")
     TestUtils.incrementalAlterConfigs(brokers, client, 
newLogCleanerDeleteRetention, perBrokerConfig = true)
       .all().get(15, TimeUnit.SECONDS)
 
@@ -2543,14 +2543,14 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         controllerServer.config.nodeId.toString)
       controllerServer.controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT,
         Collections.singletonMap(controllerNodeResource,
-          Collections.singletonMap(KafkaConfig.LogCleanerDeleteRetentionMsProp,
+          
Collections.singletonMap(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
             new SimpleImmutableEntry(AlterConfigOp.OpType.SET, "34"))), 
false).get()
       ensureConsistentKRaftMetadata()
     }
 
     waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
-      KafkaConfig.LogCleanerDeleteRetentionMsProp, "").toString.equals("34")),
-      s"Timed out waiting for change to 
${KafkaConfig.LogCleanerDeleteRetentionMsProp}",
+      CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, 
"").toString.equals("34")),
+      s"Timed out waiting for change to 
${CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP}",
       waitTimeMs = 60000L)
 
     waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 4be668aa321..c984eae0272 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -254,7 +254,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     expectedProps.setProperty(KafkaConfig.LogRetentionTimeMillisProp, 
"1680000000")
     expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "168")
     expectedProps.setProperty(KafkaConfig.LogRollTimeHoursProp, "168")
-    expectedProps.setProperty(KafkaConfig.LogCleanerThreadsProp, "1")
+    expectedProps.setProperty(CleanerConfig.LOG_CLEANER_THREADS_PROP, "1")
     val logRetentionMs = configEntry(configDesc, 
KafkaConfig.LogRetentionTimeMillisProp)
     verifyConfig(KafkaConfig.LogRetentionTimeMillisProp, logRetentionMs,
       isSensitive = false, isReadOnly = false, expectedProps)
@@ -264,8 +264,8 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     val logRollHours = configEntry(configDesc, 
KafkaConfig.LogRollTimeHoursProp)
     verifyConfig(KafkaConfig.LogRollTimeHoursProp, logRollHours,
       isSensitive = false, isReadOnly = true, expectedProps)
-    val logCleanerThreads = configEntry(configDesc, 
KafkaConfig.LogCleanerThreadsProp)
-    verifyConfig(KafkaConfig.LogCleanerThreadsProp, logCleanerThreads,
+    val logCleanerThreads = configEntry(configDesc, 
CleanerConfig.LOG_CLEANER_THREADS_PROP)
+    verifyConfig(CleanerConfig.LOG_CLEANER_THREADS_PROP, logCleanerThreads,
       isSensitive = false, isReadOnly = false, expectedProps)
 
     def synonymsList(configEntry: ConfigEntry): List[(String, ConfigSource)] =
@@ -278,7 +278,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       (KafkaConfig.LogRetentionTimeHoursProp, ConfigSource.DEFAULT_CONFIG)),
       synonymsList(logRetentionHours))
     assertEquals(List((KafkaConfig.LogRollTimeHoursProp, 
ConfigSource.DEFAULT_CONFIG)), synonymsList(logRollHours))
-    assertEquals(List((KafkaConfig.LogCleanerThreadsProp, 
ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads))
+    assertEquals(List((CleanerConfig.LOG_CLEANER_THREADS_PROP, 
ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads))
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -536,19 +536,19 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
 
     val props = new Properties
-    props.put(KafkaConfig.LogCleanerThreadsProp, "2")
-    props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "20000000")
-    props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, "0.8")
-    props.put(KafkaConfig.LogCleanerIoBufferSizeProp, "300000")
+    props.put(CleanerConfig.LOG_CLEANER_THREADS_PROP, "2")
+    props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "20000000")
+    props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, "0.8")
+    props.put(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, "300000")
     props.put(KafkaConfig.MessageMaxBytesProp, "40000")
-    props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "50000000")
-    props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000")
+    props.put(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, 
"50000000")
+    props.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, "6000")
 
     // Verify cleaner config was updated. Wait for one of the configs to be 
updated and verify
     // that all other others were updated at the same time since they are 
reconfigured together
     var newCleanerConfig: CleanerConfig = null
     TestUtils.waitUntilTrue(() => {
-      reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogCleanerThreadsProp, "2"))
+      reconfigureServers(props, perBrokerConfig = false, 
(CleanerConfig.LOG_CLEANER_THREADS_PROP, "2"))
       newCleanerConfig = servers.head.logManager.cleaner.currentConfig
       newCleanerConfig.numThreads == 2
     }, "Log cleaner not reconfigured", 60000)
@@ -566,8 +566,8 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     def cleanerThreads = 
Thread.getAllStackTraces.keySet.asScala.filter(_.getName.startsWith("kafka-log-cleaner-thread-"))
     cleanerThreads.take(2).foreach(_.interrupt())
     TestUtils.waitUntilTrue(() => cleanerThreads.size == (2 * numServers) - 2, 
"Threads did not exit")
-    props.put(KafkaConfig.LogCleanerBackoffMsProp, "8000")
-    reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogCleanerBackoffMsProp, "8000"))
+    props.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, "8000")
+    reconfigureServers(props, perBrokerConfig = false, 
(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, "8000"))
     verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
 
     // Verify that produce/consume worked throughout this test without any 
retries in producer
@@ -635,10 +635,10 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     props.put(KafkaConfig.LogRetentionTimeMillisProp, 
TimeUnit.DAYS.toMillis(1).toString)
     props.put(KafkaConfig.MessageMaxBytesProp, "100000")
     props.put(KafkaConfig.LogIndexIntervalBytesProp, "10000")
-    props.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, 
TimeUnit.DAYS.toMillis(1).toString)
-    props.put(KafkaConfig.LogCleanerMinCompactionLagMsProp, "60000")
+    props.put(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, 
TimeUnit.DAYS.toMillis(1).toString)
+    props.put(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP, "60000")
     props.put(KafkaConfig.LogDeleteDelayMsProp, "60000")
-    props.put(KafkaConfig.LogCleanerMinCleanRatioProp, "0.3")
+    props.put(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP, "0.3")
     props.put(KafkaConfig.LogCleanupPolicyProp, "delete")
     props.put(KafkaConfig.UncleanLeaderElectionEnableProp, "false")
     props.put(KafkaConfig.MinInSyncReplicasProp, "2")
diff --git 
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
 
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 8729045db7e..49e518ac2af 100755
--- 
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ 
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -251,13 +251,13 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
 
     def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig): 
KafkaConfig = {
       val props = TestUtils.createBrokerConfig(0, "localhost:2181")
-      props.put(KafkaConfig.LogCleanerThreadsProp, 
cleanerConfig.numThreads.toString)
-      props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, 
cleanerConfig.dedupeBufferSize.toString)
-      props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, 
cleanerConfig.dedupeBufferLoadFactor.toString)
-      props.put(KafkaConfig.LogCleanerIoBufferSizeProp, 
cleanerConfig.ioBufferSize.toString)
+      props.put(CleanerConfig.LOG_CLEANER_THREADS_PROP, 
cleanerConfig.numThreads.toString)
+      props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 
cleanerConfig.dedupeBufferSize.toString)
+      props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, 
cleanerConfig.dedupeBufferLoadFactor.toString)
+      props.put(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, 
cleanerConfig.ioBufferSize.toString)
       props.put(KafkaConfig.MessageMaxBytesProp, 
cleanerConfig.maxMessageSize.toString)
-      props.put(KafkaConfig.LogCleanerBackoffMsProp, 
cleanerConfig.backoffMs.toString)
-      props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 
cleanerConfig.maxIoBytesPerSecond.toString)
+      props.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, 
cleanerConfig.backoffMs.toString)
+      props.put(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, 
cleanerConfig.maxIoBytesPerSecond.toString)
       KafkaConfig.fromProps(props)
     }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 161493457e1..b3a72592196 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1959,7 +1959,7 @@ class LogCleanerTest extends Logging {
   @Test
   def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
     val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
-    oldKafkaProps.setProperty(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 
"10000000")
+    
oldKafkaProps.setProperty(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
 "10000000")
 
     val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new 
KafkaConfig(oldKafkaProps)),
       logDirs = Array(TestUtils.tempDir()),
@@ -1973,14 +1973,14 @@ class LogCleanerTest extends Logging {
     }
 
     try {
-      assertEquals(10000000, logCleaner.throttler.desiredRatePerSec, 
s"Throttler.desiredRatePerSec should be initialized from initial 
`${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+      assertEquals(10000000, logCleaner.throttler.desiredRatePerSec, 
s"Throttler.desiredRatePerSec should be initialized from initial 
`${CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP}` config.")
 
       val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
-      newKafkaProps.setProperty(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 
"20000000")
+      
newKafkaProps.setProperty(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
 "20000000")
 
       logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new 
KafkaConfig(newKafkaProps))
 
-      assertEquals(20000000, logCleaner.throttler.desiredRatePerSec, 
s"Throttler.desiredRatePerSec should be updated with new 
`${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+      assertEquals(20000000, logCleaner.throttler.desiredRatePerSec, 
s"Throttler.desiredRatePerSec should be updated with new 
`${CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP}` config.")
     } finally {
       logCleaner.shutdown()
     }
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 00777e9677b..135de6e5144 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -55,6 +55,7 @@ import 
org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult, Authorizer}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, 
MetadataVersion, ProducerIdsBlock}
 import org.apache.kafka.server.util.FutureUtils
+import org.apache.kafka.storage.internals.log.CleanerConfig
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
@@ -318,19 +319,19 @@ class ControllerApisTest {
           setResourceName("1").
           setResourceType(ConfigResource.Type.BROKER.id()).
           setConfigs(new OldAlterableConfigCollection(util.Arrays.asList(new 
OldAlterableConfig().
-            setName(KafkaConfig.LogCleanerBackoffMsProp).
+            setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
             setValue("100000")).iterator())),
         new OldAlterConfigsResource().
           setResourceName("2").
           setResourceType(ConfigResource.Type.BROKER.id()).
           setConfigs(new OldAlterableConfigCollection(util.Arrays.asList(new 
OldAlterableConfig().
-            setName(KafkaConfig.LogCleanerBackoffMsProp).
+            setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
             setValue("100000")).iterator())),
         new OldAlterConfigsResource().
           setResourceName("2").
           setResourceType(ConfigResource.Type.BROKER.id()).
           setConfigs(new OldAlterableConfigCollection(util.Arrays.asList(new 
OldAlterableConfig().
-            setName(KafkaConfig.LogCleanerBackoffMsProp).
+            setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
             setValue("100000")).iterator())),
         new OldAlterConfigsResource().
           setResourceName("baz").
@@ -472,7 +473,7 @@ class ControllerApisTest {
           setResourceName("1").
           setResourceType(ConfigResource.Type.BROKER.id()).
           setConfigs(new AlterableConfigCollection(util.Arrays.asList(new 
AlterableConfig().
-            setName(KafkaConfig.LogCleanerBackoffMsProp).
+            setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
             setValue("100000").
             setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
         new AlterConfigsResource().
@@ -536,14 +537,14 @@ class ControllerApisTest {
           setResourceName("3").
           setResourceType(ConfigResource.Type.BROKER.id()).
           setConfigs(new AlterableConfigCollection(util.Arrays.asList(new 
AlterableConfig().
-            setName(KafkaConfig.LogCleanerBackoffMsProp).
+            setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
             setValue("100000").
             setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
         new AlterConfigsResource().
           setResourceName("3").
           setResourceType(ConfigResource.Type.BROKER.id()).
           setConfigs(new AlterableConfigCollection(util.Arrays.asList(new 
AlterableConfig().
-            setName(KafkaConfig.LogCleanerBackoffMsProp).
+            setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
             setValue("100000").
             setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
         new AlterConfigsResource().
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index be065f5b8ca..4e1c8eed276 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.server.config.Defaults
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.KafkaScheduler
-import org.apache.kafka.storage.internals.log.{LogConfig, 
ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
ProducerStateManagerConfig}
 import org.apache.kafka.test.MockMetricsReporter
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -215,7 +215,7 @@ class DynamicBrokerConfigTest {
     verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, 
nonDynamicProps)
 
     // Test update of configs with invalid type
-    val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "invalid")
+    val invalidProps = Map(CleanerConfig.LOG_CLEANER_THREADS_PROP -> "invalid")
     verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, 
invalidProps)
 
     val excludedTopicConfig = Map(KafkaConfig.LogMessageFormatVersionProp -> 
"0.10.2")
@@ -225,21 +225,21 @@ class DynamicBrokerConfigTest {
   @Test
   def testConfigUpdateWithReconfigurableValidationFailure(): Unit = {
     val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
-    origProps.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "100000000")
+    origProps.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 
"100000000")
     val config = KafkaConfig(origProps)
     config.dynamicConfig.initialize(None, None)
 
     val validProps = Map.empty[String, String]
-    val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "20")
+    val invalidProps = Map(CleanerConfig.LOG_CLEANER_THREADS_PROP -> "20")
 
     def validateLogCleanerConfig(configs: util.Map[String, _]): Unit = {
-      val cleanerThreads = 
configs.get(KafkaConfig.LogCleanerThreadsProp).toString.toInt
+      val cleanerThreads = 
configs.get(CleanerConfig.LOG_CLEANER_THREADS_PROP).toString.toInt
       if (cleanerThreads <=0 || cleanerThreads >= 5)
         throw new ConfigException(s"Invalid cleaner threads $cleanerThreads")
     }
     val reconfigurable = new Reconfigurable {
       override def configure(configs: util.Map[String, _]): Unit = {}
-      override def reconfigurableConfigs(): util.Set[String] = 
Set(KafkaConfig.LogCleanerThreadsProp).asJava
+      override def reconfigurableConfigs(): util.Set[String] = 
Set(CleanerConfig.LOG_CLEANER_THREADS_PROP).asJava
       override def validateReconfiguration(configs: util.Map[String, _]): Unit 
= validateLogCleanerConfig(configs)
       override def reconfigure(configs: util.Map[String, _]): Unit = {}
     }
@@ -248,7 +248,7 @@ class DynamicBrokerConfigTest {
     config.dynamicConfig.removeReconfigurable(reconfigurable)
 
     val brokerReconfigurable = new BrokerReconfigurable {
-      override def reconfigurableConfigs: collection.Set[String] = 
Set(KafkaConfig.LogCleanerThreadsProp)
+      override def reconfigurableConfigs: collection.Set[String] = 
Set(CleanerConfig.LOG_CLEANER_THREADS_PROP)
       override def validateReconfiguration(newConfig: KafkaConfig): Unit = 
validateLogCleanerConfig(newConfig.originals)
       override def reconfigure(oldConfig: KafkaConfig, newConfig: 
KafkaConfig): Unit = {}
     }
@@ -260,8 +260,8 @@ class DynamicBrokerConfigTest {
   def testReconfigurableValidation(): Unit = {
     val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
     val config = KafkaConfig(origProps)
-    val invalidReconfigurableProps = Set(KafkaConfig.LogCleanerThreadsProp, 
KafkaConfig.BrokerIdProp, "some.prop")
-    val validReconfigurableProps = Set(KafkaConfig.LogCleanerThreadsProp, 
KafkaConfig.LogCleanerDedupeBufferSizeProp, "some.prop")
+    val invalidReconfigurableProps = 
Set(CleanerConfig.LOG_CLEANER_THREADS_PROP, KafkaConfig.BrokerIdProp, 
"some.prop")
+    val validReconfigurableProps = Set(CleanerConfig.LOG_CLEANER_THREADS_PROP, 
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "some.prop")
 
     def createReconfigurable(configs: Set[String]) = new Reconfigurable {
       override def configure(configs: util.Map[String, _]): Unit = {}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e9426245b14..a5d4d961fe1 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
 import org.apache.kafka.server.config.ServerTopicConfigSynonyms
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
-import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
 import org.junit.jupiter.api.function.Executable
 
 import scala.annotation.nowarn
@@ -845,14 +845,14 @@ class KafkaConfigTest {
         case KafkaConfig.LogRetentionBytesProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case KafkaConfig.LogCleanupIntervalMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
         case KafkaConfig.LogCleanupPolicyProp => 
assertPropertyInvalid(baseProperties, name, "unknown_policy", "0")
-        case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
-        case KafkaConfig.LogCleanerDedupeBufferSizeProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "1024")
-        case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
-        case KafkaConfig.LogCleanerEnableProp => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
-        case KafkaConfig.LogCleanerDeleteRetentionMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
-        case KafkaConfig.LogCleanerMinCompactionLagMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
-        case KafkaConfig.LogCleanerMaxCompactionLagMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
-        case KafkaConfig.LogCleanerMinCleanRatioProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "1024")
+        case CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case CleanerConfig.LOG_CLEANER_ENABLE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+        case CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case KafkaConfig.LogIndexSizeMaxBytesProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "3")
         case KafkaConfig.LogFlushIntervalMessagesProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
         case KafkaConfig.LogFlushSchedulerIntervalMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7b1ce80cbed..17ee3139a51 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -373,7 +373,7 @@ object TestUtils extends Logging {
     props.put(KafkaConfig.DeleteTopicEnableProp, enableDeleteTopic.toString)
     props.put(KafkaConfig.LogDeleteDelayMsProp, "1000")
     props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "100")
-    props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "2097152")
+    props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152")
     props.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
     if (!props.containsKey(KafkaConfig.OffsetsTopicPartitionsProp))
       props.put(KafkaConfig.OffsetsTopicPartitionsProp, "5")
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
index 320de2db6b9..02cdc1cc875 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils;
 
 public final class ServerTopicConfigSynonyms {
     private static final String LOG_PREFIX = "log.";
-    private static final String LOG_CLEANER_PREFIX = LOG_PREFIX + "cleaner.";
+    public static final String LOG_CLEANER_PREFIX = LOG_PREFIX + "cleaner.";
 
     /**
      * Maps topic configurations to their equivalent broker configurations.
diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java 
b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
index e55f641790f..0c7ca123b87 100644
--- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java
+++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
@@ -106,13 +106,6 @@ public class Defaults {
     public static final int NUM_PARTITIONS = 1;
     public static final String LOG_DIR = "/tmp/kafka-logs";
     public static final long LOG_CLEANUP_INTERVAL_MS = 5 * 60 * 1000L;
-    public static final int LOG_CLEANER_THREADS = 1;
-    public static final double LOG_CLEANER_IO_MAX_BYTES_PER_SECOND = 
Double.MAX_VALUE;
-    public static final long LOG_CLEANER_DEDUPE_BUFFER_SIZE = 128 * 1024 * 
1024L;
-    public static final int LOG_CLEANER_IO_BUFFER_SIZE = 512 * 1024;
-    public static final double LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR = 0.9d;
-    public static final int LOG_CLEANER_BACKOFF_MS = 15 * 1000;
-    public static final boolean LOG_CLEANER_ENABLE = true;
     public static final int LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS = 60000;
     public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS = 
60000;
     public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1;
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
index 9e38d0c02e9..8168197fe08 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
@@ -16,11 +16,53 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+
 /**
  * Configuration parameters for the log cleaner.
  */
 public class CleanerConfig {
     public static final String HASH_ALGORITHM = "MD5";
+    public static final int LOG_CLEANER_THREADS = 1;
+    public static final double LOG_CLEANER_IO_MAX_BYTES_PER_SECOND = 
Double.MAX_VALUE;
+    public static final long LOG_CLEANER_DEDUPE_BUFFER_SIZE = 128 * 1024 * 
1024L;
+    public static final int LOG_CLEANER_IO_BUFFER_SIZE = 512 * 1024;
+    public static final double LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR = 0.9d;
+    public static final int LOG_CLEANER_BACKOFF_MS = 15 * 1000;
+    public static final boolean LOG_CLEANER_ENABLE = true;
+
+    public static final String LOG_CLEANER_THREADS_PROP = 
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "threads";
+    public static final String LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP = 
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "io.max.bytes.per.second";
+    public static final String LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP = 
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "dedupe.buffer.size";
+    public static final String LOG_CLEANER_IO_BUFFER_SIZE_PROP = 
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "io.buffer.size";
+    public static final String LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP = 
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "io.buffer.load.factor";
+    public static final String LOG_CLEANER_BACKOFF_MS_PROP = 
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "backoff.ms";
+    public static final String LOG_CLEANER_MIN_CLEAN_RATIO_PROP = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG);
+    public static final String LOG_CLEANER_ENABLE_PROP = 
ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "enable";
+    public static final String LOG_CLEANER_DELETE_RETENTION_MS_PROP = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.DELETE_RETENTION_MS_CONFIG);
+    public static final String LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG);
+    public static final String LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
+
+    public static final String LOG_CLEANER_MIN_CLEAN_RATIO_DOC = "The minimum 
ratio of dirty log to total log for a log to eligible for cleaning. " +
+            "If the " + LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP + " or the " + 
LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP +
+            " configurations are also specified, then the log compactor 
considers the log eligible for compaction " +
+            "as soon as either: (i) the dirty ratio threshold has been met and 
the log has had dirty (uncompacted) " +
+            "records for at least the " + 
LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP + " duration, or (ii) if the log has had 
" +
+            "dirty (uncompacted) records for at most the " + 
LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP + " period.";
+    public static final String LOG_CLEANER_THREADS_DOC = "The number of 
background threads to use for log cleaning";
+    public static final String LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_DOC = "The 
log cleaner will be throttled so that the sum of its read and write i/o will be 
less than this value on average";
+    public static final String LOG_CLEANER_DEDUPE_BUFFER_SIZE_DOC = "The total 
memory used for log deduplication across all cleaner threads";
+    public static final String LOG_CLEANER_IO_BUFFER_SIZE_DOC = "The total 
memory used for log cleaner I/O buffers across all cleaner threads";
+    public static final String LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_DOC = 
"Log cleaner dedupe buffer load factor. The percentage full the dedupe buffer 
can become. A higher value " +
+            "will allow more log to be cleaned at once but will lead to more 
hash collisions";
+    public static final String LOG_CLEANER_BACKOFF_MS_DOC = "The amount of 
time to sleep when there are no logs to clean";
+    public static final String LOG_CLEANER_ENABLE_DOC = "Enable the log 
cleaner process to run on the server. Should be enabled if using any topics 
with a cleanup.policy=compact including the internal offsets topic. If disabled 
those topics will not be compacted and continually grow in size.";
+    public static final String LOG_CLEANER_DELETE_RETENTION_MS_DOC = "The 
amount of time to retain tombstone message markers for log compacted topics. 
This setting also gives a bound " +
+            "on the time in which a consumer must complete a read if they 
begin from offset 0 to ensure that they get a valid snapshot of the final stage 
(otherwise  " +
+            "tombstones messages may be collected before a consumer completes 
their scan).";
+    public static final String LOG_CLEANER_MIN_COMPACTION_LAG_MS_DOC = "The 
minimum time a message will remain uncompacted in the log. Only applicable for 
logs that are being compacted.";
+    public static final String LOG_CLEANER_MAX_COMPACTION_LAG_MS_DOC = "The 
maximum time a message will remain ineligible for compaction in the log. Only 
applicable for logs that are being compacted.";
 
     public final int numThreads;
     public final long dedupeBufferSize;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 3f3e9a243bd..4232e1d74c9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.server.config.ConfigType;
 import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
@@ -111,7 +112,7 @@ public class EmbeddedKafkaCluster {
         brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
         putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), 
"PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);
         putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
-        putIfAbsent(brokerConfig, 
KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
+        putIfAbsent(brokerConfig, 
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
         putIfAbsent(brokerConfig, KafkaConfig.GroupMinSessionTimeoutMsProp(), 
0);
         putIfAbsent(brokerConfig, 
KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
         putIfAbsent(brokerConfig, 
KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) 1);

Reply via email to