This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 70cecb6  KAFKA-6244; Dynamic update of log cleaner configuration 
(#4465)
70cecb6 is described below

commit 70cecb6881f8b4f45cfd4780ef27f9fb9a8ce0b6
Author: Rajini Sivaram <[email protected]>
AuthorDate: Fri Jan 26 14:38:46 2018 -0800

    KAFKA-6244; Dynamic update of log cleaner configuration (#4465)
---
 core/src/main/scala/kafka/log/LogCleaner.scala     | 74 ++++++++++++++++++++--
 core/src/main/scala/kafka/log/LogManager.scala     | 11 +---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 70 ++++++++++++++++----
 core/src/main/scala/kafka/server/KafkaServer.scala |  3 +
 .../server/DynamicBrokerReconfigurationTest.scala  | 53 ++++++++++++++++
 .../log/AbstractLogCleanerIntegrationTest.scala    |  3 +-
 .../unit/kafka/log/LogCleanerIntegrationTest.scala | 52 +++++++++++++++
 .../kafka/server/DynamicBrokerConfigTest.scala     | 17 ++---
 8 files changed, 244 insertions(+), 39 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 637e24c..e013cfb 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -20,13 +20,14 @@ package kafka.log
 import java.io.{File, IOException}
 import java.nio._
 import java.nio.file.Files
+import java.util
 import java.util.Date
 import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.core.Gauge
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.LogDirFailureChannel
+import kafka.server.{BrokerReconfigurable, KafkaConfig, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
@@ -35,7 +36,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 
-import scala.collection.mutable
+import scala.collection.{Set, mutable}
 import scala.collection.JavaConverters._
 
 /**
@@ -83,16 +84,20 @@ import scala.collection.JavaConverters._
  *    data from the transaction prior to reaching the offset of the marker. 
This follows the same logic used for
  *    tombstone deletion.
  *
- * @param config Configuration parameters for the cleaner
+ * @param initialConfig Initial configuration parameters for the cleaner. 
Actual config may be dynamically updated.
  * @param logDirs The directories where offset checkpoints reside
  * @param logs The pool of logs
  * @param time A way to control the passage of time
  */
-class LogCleaner(val config: CleanerConfig,
+class LogCleaner(initialConfig: CleanerConfig,
                  val logDirs: Seq[File],
                  val logs: Pool[TopicPartition, Log],
                  val logDirFailureChannel: LogDirFailureChannel,
-                 time: Time = Time.SYSTEM) extends Logging with 
KafkaMetricsGroup {
+                 time: Time = Time.SYSTEM) extends Logging with 
KafkaMetricsGroup with BrokerReconfigurable
+{
+
+  /* Log cleaner configuration which may be dynamically updated */
+  @volatile private var config = initialConfig
 
   /* for managing the state of partitions being cleaned. package-private to 
allow access in tests */
   private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, 
logDirFailureChannel)
@@ -106,7 +111,7 @@ class LogCleaner(val config: CleanerConfig,
                                         time = time)
 
   /* the threads */
-  private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
+  private val cleaners = mutable.ArrayBuffer[CleanerThread]()
 
   /* a metric to track the maximum utilization of any thread's buffer in the 
last cleaning */
   newGauge("max-buffer-utilization-percent",
@@ -133,7 +138,11 @@ class LogCleaner(val config: CleanerConfig,
    */
   def startup() {
     info("Starting the log cleaner")
-    cleaners.foreach(_.start())
+    (0 until config.numThreads).foreach { i =>
+      val cleaner = new CleanerThread(i)
+      cleaners += cleaner
+      cleaner.start()
+    }
   }
 
   /**
@@ -142,6 +151,27 @@ class LogCleaner(val config: CleanerConfig,
   def shutdown() {
     info("Shutting down the log cleaner.")
     cleaners.foreach(_.shutdown())
+    cleaners.clear()
+  }
+
+  override def reconfigurableConfigs(): Set[String] = {
+    LogCleaner.ReconfigurableConfigs
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Boolean = {
+    val newCleanerConfig = LogCleaner.cleanerConfig(newConfig)
+    val numThreads = newCleanerConfig.numThreads
+    numThreads >= 1 && numThreads >= config.numThreads / 2 && numThreads <= 
config.numThreads * 2
+  }
+
+  /**
+    * Reconfigure log clean config. This simply stops current log cleaners and 
creates new ones.
+    * That ensures that if any of the cleaners had failed, new cleaners are 
created to match the new config.
+    */
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
+    config = LogCleaner.cleanerConfig(newConfig)
+    shutdown()
+    startup()
   }
 
   /**
@@ -210,6 +240,12 @@ class LogCleaner(val config: CleanerConfig,
     isCleaned
   }
 
+  // Only for testing
+  private[kafka] def currentConfig: CleanerConfig = config
+
+  // Only for testing
+  private[log] def cleanerCount: Int = cleaners.size
+
   /**
    * The cleaner threads do the actual log cleaning. Each thread processes 
does its cleaning repeatedly by
    * choosing the dirtiest log, cleaning it, and then swapping in the cleaned 
segments.
@@ -317,6 +353,30 @@ class LogCleaner(val config: CleanerConfig,
   }
 }
 
+object LogCleaner {
+  val ReconfigurableConfigs = Set(
+    KafkaConfig.LogCleanerThreadsProp,
+    KafkaConfig.LogCleanerDedupeBufferSizeProp,
+    KafkaConfig.LogCleanerDedupeBufferLoadFactorProp,
+    KafkaConfig.LogCleanerIoBufferSizeProp,
+    KafkaConfig.MessageMaxBytesProp,
+    KafkaConfig.LogCleanerIoMaxBytesPerSecondProp,
+    KafkaConfig.LogCleanerBackoffMsProp
+  )
+
+  def cleanerConfig(config: KafkaConfig): CleanerConfig = {
+    CleanerConfig(numThreads = config.logCleanerThreads,
+      dedupeBufferSize = config.logCleanerDedupeBufferSize,
+      dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
+      ioBufferSize = config.logCleanerIoBufferSize,
+      maxMessageSize = config.messageMaxBytes,
+      maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
+      backOffMs = config.logCleanerBackoffMs,
+      enableCleaner = config.logCleanerEnable)
+
+  }
+}
+
 /**
  * This class holds the actual logic for cleaning a log
  * @param id An identifier used for logging
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index a7d106f..37a0be8 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -893,18 +893,11 @@ object LogManager {
     val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
     val defaultLogConfig = LogConfig(defaultProps)
 
+    // read the log configurations from zookeeper
     val (topicConfigs, failed) = 
zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps)
     if (!failed.isEmpty) throw failed.head._2
 
-    // read the log configurations from zookeeper
-    val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
-      dedupeBufferSize = config.logCleanerDedupeBufferSize,
-      dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
-      ioBufferSize = config.logCleanerIoBufferSize,
-      maxMessageSize = config.messageMaxBytes,
-      maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
-      backOffMs = config.logCleanerBackoffMs,
-      enableCleaner = config.logCleanerEnable)
+    val cleanerConfig = LogCleaner.cleanerConfig(config)
 
     new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
       initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index f307b8d..2c186d3 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -22,6 +22,7 @@ import java.util
 import java.util.Properties
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
+import kafka.log.LogCleaner
 import kafka.server.DynamicBrokerConfig._
 import kafka.utils.{CoreUtils, Logging}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -77,6 +78,7 @@ object DynamicBrokerConfig {
 
   val AllDynamicConfigs = mutable.Set[String]()
   AllDynamicConfigs ++= DynamicSecurityConfigs
+  AllDynamicConfigs ++= LogCleaner.ReconfigurableConfigs
 
   private val PerBrokerConfigs = DynamicSecurityConfigs
 
@@ -115,6 +117,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
   private val brokerId = kafkaConfig.brokerId
   private val reconfigurables = mutable.Buffer[Reconfigurable]()
+  private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
   private var currentConfig = kafkaConfig
 
@@ -124,11 +127,21 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     updateBrokerConfig(brokerId, 
adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString))
   }
 
+  def addReconfigurables(kafkaServer: KafkaServer): Unit = {
+    if (kafkaServer.logManager.cleaner != null)
+      addBrokerReconfigurable(kafkaServer.logManager.cleaner)
+  }
+
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
     
require(reconfigurable.reconfigurableConfigs.asScala.forall(AllDynamicConfigs.contains))
     reconfigurables += reconfigurable
   }
 
+  def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
+    
require(reconfigurable.reconfigurableConfigs.forall(AllDynamicConfigs.contains))
+    brokerReconfigurables += reconfigurable
+  }
+
   def removeReconfigurable(reconfigurable: Reconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
     reconfigurables -= reconfigurable
   }
@@ -327,9 +340,15 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
             val oldValues = 
currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
             val newValues = 
newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
             val updatedKeys = updatedConfigs(newValues, oldValues).keySet
-            processReconfigurable(listenerReconfigurable, updatedKeys, 
newValues, customConfigs, validateOnly)
+            if 
(needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, 
updatedKeys))
+              processReconfigurable(listenerReconfigurable, newValues, 
customConfigs, validateOnly)
           case reconfigurable =>
-            processReconfigurable(reconfigurable, updatedMap.keySet, 
newConfig.valuesFromThisConfig, customConfigs, validateOnly)
+            if (needsReconfiguration(reconfigurable.reconfigurableConfigs, 
updatedMap.keySet))
+              processReconfigurable(reconfigurable, 
newConfig.valuesFromThisConfig, customConfigs, validateOnly)
+        }
+        brokerReconfigurables.foreach { reconfigurable =>
+          if 
(needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, 
updatedMap.keySet))
+            processBrokerReconfigurable(reconfigurable, currentConfig, 
newConfig, validateOnly)
         }
         newConfig
       } catch {
@@ -343,18 +362,41 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
       currentConfig
   }
 
-  private def processReconfigurable(reconfigurable: Reconfigurable, 
updatedKeys: Set[String],
-                                    allNewConfigs: util.Map[String, _], 
newCustomConfigs: util.Map[String, Object],
+  private def needsReconfiguration(reconfigurableConfigs: util.Set[String], 
updatedKeys: Set[String]): Boolean = {
+    reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty
+  }
+
+  private def processReconfigurable(reconfigurable: Reconfigurable,
+                                    allNewConfigs: util.Map[String, _],
+                                    newCustomConfigs: util.Map[String, Object],
                                     validateOnly: Boolean): Unit = {
-    if 
(reconfigurable.reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty) {
-      val newConfigs = new util.HashMap[String, Object]
-      allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, 
v.asInstanceOf[AnyRef]) }
-      newConfigs.putAll(newCustomConfigs)
-      if (validateOnly) {
-        if (!reconfigurable.validateReconfiguration(newConfigs))
-          throw new ConfigException("Validation of dynamic config update 
failed")
-      } else
-        reconfigurable.reconfigure(newConfigs)
-    }
+    val newConfigs = new util.HashMap[String, Object]
+    allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, 
v.asInstanceOf[AnyRef]) }
+    newConfigs.putAll(newCustomConfigs)
+    if (validateOnly) {
+      if (!reconfigurable.validateReconfiguration(newConfigs))
+        throw new ConfigException("Validation of dynamic config update failed")
+    } else
+      reconfigurable.reconfigure(newConfigs)
   }
+
+  private def processBrokerReconfigurable(reconfigurable: BrokerReconfigurable,
+                                          oldConfig: KafkaConfig,
+                                          newConfig: KafkaConfig,
+                                          validateOnly: Boolean): Unit = {
+    if (validateOnly) {
+      if (!reconfigurable.validateReconfiguration(newConfig))
+        throw new ConfigException("Validation of dynamic config update failed")
+    } else
+      reconfigurable.reconfigure(oldConfig, newConfig)
+  }
+}
+
+trait BrokerReconfigurable {
+
+  def reconfigurableConfigs: Set[String]
+
+  def validateReconfiguration(newConfig: KafkaConfig): Boolean
+
+  def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
 }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 80b0eb7..c4123f1 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -290,6 +290,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
         Mx4jLoader.maybeLoad()
 
+        /* Add all reconfigurables for config change notification before 
starting config handlers */
+        config.dynamicConfig.addReconfigurables(this)
+
         /* start dynamic config manager */
         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> 
new TopicConfigHandler(logManager, config, quotaManagers),
                                                            ConfigType.Client 
-> new ClientIdConfigHandler(quotaManagers),
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index a760d7d..c6f023f 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -242,6 +242,46 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     stopAndVerifyProduceConsume(producerThread, consumerThread, 
mayFailRequests = false)
   }
 
+  @Test
+  def testLogCleanerConfig(): Unit = {
+    val (producerThread, consumerThread) = startProduceConsume(0)
+
+    verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
+
+    val props = new Properties
+    props.put(KafkaConfig.LogCleanerThreadsProp, "2")
+    props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "20000000")
+    props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, "0.8")
+    props.put(KafkaConfig.LogCleanerIoBufferSizeProp, "300000")
+    props.put(KafkaConfig.MessageMaxBytesProp, "40000")
+    props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "50000000")
+    props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000")
+    reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogCleanerThreadsProp, "2"))
+
+    // Verify cleaner config was updated
+    val newCleanerConfig = servers.head.logManager.cleaner.currentConfig
+    assertEquals(2, newCleanerConfig.numThreads)
+    assertEquals(20000000, newCleanerConfig.dedupeBufferSize)
+    assertEquals(0.8, newCleanerConfig.dedupeBufferLoadFactor, 0.001)
+    assertEquals(300000, newCleanerConfig.ioBufferSize)
+    assertEquals(40000, newCleanerConfig.maxMessageSize)
+    assertEquals(50000000, newCleanerConfig.maxIoBytesPerSecond, 50000000)
+    assertEquals(6000, newCleanerConfig.backOffMs)
+
+    // Verify thread count
+    verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
+
+    // Stop a couple of threads and verify they are recreated if any config is 
updated
+    def cleanerThreads = 
Thread.getAllStackTraces.keySet.asScala.filter(_.getName.startsWith("kafka-log-cleaner-thread-"))
+    cleanerThreads.take(2).foreach(_.interrupt())
+    TestUtils.waitUntilTrue(() => cleanerThreads.size == (2 * numServers) - 2, 
"Threads did not exit")
+    props.put(KafkaConfig.LogCleanerBackoffMsProp, "8000")
+    reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogCleanerBackoffMsProp, "8000"))
+    verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
+
+    stopAndVerifyProduceConsume(producerThread, consumerThread, 
mayFailRequests = false)
+  }
+
   private def createProducer(trustStore: File, retries: Int,
                              clientId: String = "test-producer"): 
KafkaProducer[String, String] = {
     val bootstrapServers = TestUtils.bootstrapServers(servers, new 
ListenerName(SecureExternal))
@@ -411,6 +451,19 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     props
   }
 
+  private def currentThreads: List[String] = {
+    Thread.getAllStackTraces.keySet.asScala.toList.map(_.getName)
+  }
+
+  private def verifyThreads(threadPrefix: String, countPerBroker: Int): Unit = 
{
+    val expectedCount = countPerBroker * servers.size
+    val (threads, resized) = 
TestUtils.computeUntilTrue(currentThreads.filter(_.startsWith(threadPrefix))) {
+      _.size == expectedCount
+    }
+    assertTrue(s"Invalid threads: expected $expectedCount, got 
${threads.size}: $threads", resized)
+  }
+
+
   private def startProduceConsume(retries: Int): (ProducerThread, 
ConsumerThread) = {
     val producerThread = new ProducerThread(retries)
     clientThreads += producerThread
diff --git 
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index bff2700..0ad5b46 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -79,6 +79,7 @@ abstract class AbstractLogCleanerIntegrationTest {
                   compactionLag: Long = defaultCompactionLag,
                   deleteDelay: Int = defaultDeleteDelay,
                   segmentSize: Int = defaultSegmentSize,
+                  cleanerIoBufferSize: Option[Int] = None,
                   propertyOverrides: Properties = new Properties()): 
LogCleaner = {
 
     val logMap = new Pool[TopicPartition, Log]()
@@ -108,7 +109,7 @@ abstract class AbstractLogCleanerIntegrationTest {
 
     val cleanerConfig = CleanerConfig(
       numThreads = numThreads,
-      ioBufferSize = maxMessageSize / 2,
+      ioBufferSize = cleanerIoBufferSize.getOrElse(maxMessageSize / 2),
       maxMessageSize = maxMessageSize,
       backOffMs = backOffMs)
     new LogCleaner(cleanerConfig,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index b20622f..22d7e77 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -18,10 +18,12 @@
 package kafka.log
 
 import java.io.File
+import java.util
 import java.util.Properties
 
 import kafka.api.KAFKA_0_11_0_IV0
 import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
+import kafka.server.KafkaConfig
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
@@ -227,6 +229,56 @@ class LogCleanerIntegrationTest(compressionCodec: String) 
extends AbstractLogCle
     checkLogAfterAppendingDups(log, startSize, appends)
   }
 
+  @Test
+  def cleanerConfigUpdateTest() {
+    val largeMessageKey = 20
+    val (largeMessageValue, largeMessageSet) = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
+    val maxMessageSize = largeMessageSet.sizeInBytes
+
+    cleaner = makeCleaner(partitions = topicPartitions, backOffMs = 1, 
maxMessageSize = maxMessageSize,
+      cleanerIoBufferSize = Some(1))
+    val log = cleaner.logs.get(topicPartitions(0))
+
+    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = 
codec)
+    val startSize = log.size
+    cleaner.startup()
+    assertEquals(1, cleaner.cleanerCount)
+
+    // Verify no cleaning with LogCleanerIoBufferSizeProp=1
+    val firstDirty = log.activeSegment.baseOffset
+    val topicPartition = new TopicPartition("log", 0)
+    cleaner.awaitCleaned(topicPartition, firstDirty, maxWaitMs = 10)
+    assertTrue("Should not have cleaned", 
cleaner.cleanerManager.allCleanerCheckpoints.isEmpty)
+
+    def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig): 
KafkaConfig = {
+      val props = TestUtils.createBrokerConfig(0, "localhost:2181")
+      props.put(KafkaConfig.LogCleanerThreadsProp, 
cleanerConfig.numThreads.toString)
+      props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, 
cleanerConfig.dedupeBufferSize.toString)
+      props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, 
cleanerConfig.dedupeBufferLoadFactor.toString)
+      props.put(KafkaConfig.LogCleanerIoBufferSizeProp, 
cleanerConfig.ioBufferSize.toString)
+      props.put(KafkaConfig.MessageMaxBytesProp, 
cleanerConfig.maxMessageSize.toString)
+      props.put(KafkaConfig.LogCleanerBackoffMsProp, 
cleanerConfig.backOffMs.toString)
+      props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 
cleanerConfig.maxIoBytesPerSecond.toString)
+      KafkaConfig.fromProps(props)
+    }
+
+    // Verify cleaning done with larger LogCleanerIoBufferSizeProp
+    val oldConfig = kafkaConfigWithCleanerConfig(cleaner.currentConfig)
+    val newConfig = kafkaConfigWithCleanerConfig(CleanerConfig(numThreads = 2,
+      dedupeBufferSize = cleaner.currentConfig.dedupeBufferSize,
+      dedupeBufferLoadFactor = cleaner.currentConfig.dedupeBufferLoadFactor,
+      ioBufferSize = 100000,
+      maxMessageSize = cleaner.currentConfig.maxMessageSize,
+      maxIoBytesPerSecond = cleaner.currentConfig.maxIoBytesPerSecond,
+      backOffMs = cleaner.currentConfig.backOffMs))
+    cleaner.reconfigure(oldConfig, newConfig)
+
+    assertEquals(2, cleaner.cleanerCount)
+    checkLastCleaned("log", 0, firstDirty)
+    val compactedSize = log.logSegments.map(_.size).sum
+    assertTrue(s"log should have been compacted: startSize=$startSize 
compactedSize=$compactedSize", startSize > compactedSize)
+  }
+
   private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: 
Long) {
     // wait until cleaning up to base_offset, note that cleaning happens only 
when "log dirty ratio" is higher than
     // LogConfig.MinCleanableDirtyRatioProp
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 2032011..6dedbe0 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -104,23 +104,24 @@ class DynamicBrokerConfigTest {
     verifyConfigUpdateWithInvalidConfig(validProps, 
securityPropsWithoutListenerPrefix)
     val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
     verifyConfigUpdateWithInvalidConfig(validProps, nonDynamicProps)
+
+    val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "invalid")
+    verifyConfigUpdateWithInvalidConfig(validProps, invalidProps)
   }
 
   @Test
   def testSecurityConfigs(): Unit = {
-    def verifyUpdate(name: String, value: Object, invalidValue: Boolean): Unit 
= {
+    def verifyUpdate(name: String, value: Object): Unit = {
       verifyConfigUpdate(name, value, perBrokerConfig = true, expectFailure = 
true)
-      verifyConfigUpdate(s"listener.name.external.$name", value, 
perBrokerConfig = true, expectFailure = invalidValue)
+      verifyConfigUpdate(s"listener.name.external.$name", value, 
perBrokerConfig = true, expectFailure = false)
       verifyConfigUpdate(name, value, perBrokerConfig = false, expectFailure = 
true)
       verifyConfigUpdate(s"listener.name.external.$name", value, 
perBrokerConfig = false, expectFailure = true)
     }
 
-    verifyUpdate(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ks.jks", 
invalidValue = false)
-    verifyUpdate(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS", invalidValue = 
false)
-    verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password", 
invalidValue = false)
-    verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password", invalidValue 
= false)
-    verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
1.asInstanceOf[Integer], invalidValue = true)
-    verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 1.asInstanceOf[Integer], 
invalidValue = true)
+    verifyUpdate(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ks.jks")
+    verifyUpdate(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
+    verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password")
+    verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password")
   }
 
   private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: 
Boolean, expectFailure: Boolean) {

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to