This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 70cecb6 KAFKA-6244; Dynamic update of log cleaner configuration
(#4465)
70cecb6 is described below
commit 70cecb6881f8b4f45cfd4780ef27f9fb9a8ce0b6
Author: Rajini Sivaram <[email protected]>
AuthorDate: Fri Jan 26 14:38:46 2018 -0800
KAFKA-6244; Dynamic update of log cleaner configuration (#4465)
---
core/src/main/scala/kafka/log/LogCleaner.scala | 74 ++++++++++++++++++++--
core/src/main/scala/kafka/log/LogManager.scala | 11 +---
.../scala/kafka/server/DynamicBrokerConfig.scala | 70 ++++++++++++++++----
core/src/main/scala/kafka/server/KafkaServer.scala | 3 +
.../server/DynamicBrokerReconfigurationTest.scala | 53 ++++++++++++++++
.../log/AbstractLogCleanerIntegrationTest.scala | 3 +-
.../unit/kafka/log/LogCleanerIntegrationTest.scala | 52 +++++++++++++++
.../kafka/server/DynamicBrokerConfigTest.scala | 17 ++---
8 files changed, 244 insertions(+), 39 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 637e24c..e013cfb 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -20,13 +20,14 @@ package kafka.log
import java.io.{File, IOException}
import java.nio._
import java.nio.file.Files
+import java.util
import java.util.Date
import java.util.concurrent.TimeUnit
import com.yammer.metrics.core.Gauge
import kafka.common._
import kafka.metrics.KafkaMetricsGroup
-import kafka.server.LogDirFailureChannel
+import kafka.server.{BrokerReconfigurable, KafkaConfig, LogDirFailureChannel}
import kafka.utils._
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
@@ -35,7 +36,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
-import scala.collection.mutable
+import scala.collection.{Set, mutable}
import scala.collection.JavaConverters._
/**
@@ -83,16 +84,20 @@ import scala.collection.JavaConverters._
* data from the transaction prior to reaching the offset of the marker.
This follows the same logic used for
* tombstone deletion.
*
- * @param config Configuration parameters for the cleaner
+ * @param initialConfig Initial configuration parameters for the cleaner.
Actual config may be dynamically updated.
* @param logDirs The directories where offset checkpoints reside
* @param logs The pool of logs
* @param time A way to control the passage of time
*/
-class LogCleaner(val config: CleanerConfig,
+class LogCleaner(initialConfig: CleanerConfig,
val logDirs: Seq[File],
val logs: Pool[TopicPartition, Log],
val logDirFailureChannel: LogDirFailureChannel,
- time: Time = Time.SYSTEM) extends Logging with
KafkaMetricsGroup {
+ time: Time = Time.SYSTEM) extends Logging with
KafkaMetricsGroup with BrokerReconfigurable
+{
+
+ /* Log cleaner configuration which may be dynamically updated */
+ @volatile private var config = initialConfig
/* for managing the state of partitions being cleaned. package-private to
allow access in tests */
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs,
logDirFailureChannel)
@@ -106,7 +111,7 @@ class LogCleaner(val config: CleanerConfig,
time = time)
/* the threads */
- private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
+ private val cleaners = mutable.ArrayBuffer[CleanerThread]()
/* a metric to track the maximum utilization of any thread's buffer in the
last cleaning */
newGauge("max-buffer-utilization-percent",
@@ -133,7 +138,11 @@ class LogCleaner(val config: CleanerConfig,
*/
def startup() {
info("Starting the log cleaner")
- cleaners.foreach(_.start())
+ (0 until config.numThreads).foreach { i =>
+ val cleaner = new CleanerThread(i)
+ cleaners += cleaner
+ cleaner.start()
+ }
}
/**
@@ -142,6 +151,27 @@ class LogCleaner(val config: CleanerConfig,
def shutdown() {
info("Shutting down the log cleaner.")
cleaners.foreach(_.shutdown())
+ cleaners.clear()
+ }
+
+ override def reconfigurableConfigs(): Set[String] = {
+ LogCleaner.ReconfigurableConfigs
+ }
+
+ override def validateReconfiguration(newConfig: KafkaConfig): Boolean = {
+ val newCleanerConfig = LogCleaner.cleanerConfig(newConfig)
+ val numThreads = newCleanerConfig.numThreads
+ numThreads >= 1 && numThreads >= config.numThreads / 2 && numThreads <=
config.numThreads * 2
+ }
+
+ /**
+ * Reconfigure log clean config. This simply stops current log cleaners and
creates new ones.
+ * That ensures that if any of the cleaners had failed, new cleaners are
created to match the new config.
+ */
+ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
Unit = {
+ config = LogCleaner.cleanerConfig(newConfig)
+ shutdown()
+ startup()
}
/**
@@ -210,6 +240,12 @@ class LogCleaner(val config: CleanerConfig,
isCleaned
}
+ // Only for testing
+ private[kafka] def currentConfig: CleanerConfig = config
+
+ // Only for testing
+ private[log] def cleanerCount: Int = cleaners.size
+
/**
* The cleaner threads do the actual log cleaning. Each thread processes
does its cleaning repeatedly by
* choosing the dirtiest log, cleaning it, and then swapping in the cleaned
segments.
@@ -317,6 +353,30 @@ class LogCleaner(val config: CleanerConfig,
}
}
+object LogCleaner {
+ val ReconfigurableConfigs = Set(
+ KafkaConfig.LogCleanerThreadsProp,
+ KafkaConfig.LogCleanerDedupeBufferSizeProp,
+ KafkaConfig.LogCleanerDedupeBufferLoadFactorProp,
+ KafkaConfig.LogCleanerIoBufferSizeProp,
+ KafkaConfig.MessageMaxBytesProp,
+ KafkaConfig.LogCleanerIoMaxBytesPerSecondProp,
+ KafkaConfig.LogCleanerBackoffMsProp
+ )
+
+ def cleanerConfig(config: KafkaConfig): CleanerConfig = {
+ CleanerConfig(numThreads = config.logCleanerThreads,
+ dedupeBufferSize = config.logCleanerDedupeBufferSize,
+ dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
+ ioBufferSize = config.logCleanerIoBufferSize,
+ maxMessageSize = config.messageMaxBytes,
+ maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
+ backOffMs = config.logCleanerBackoffMs,
+ enableCleaner = config.logCleanerEnable)
+
+ }
+}
+
/**
* This class holds the actual logic for cleaning a log
* @param id An identifier used for logging
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index a7d106f..37a0be8 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -893,18 +893,11 @@ object LogManager {
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultLogConfig = LogConfig(defaultProps)
+ // read the log configurations from zookeeper
val (topicConfigs, failed) =
zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps)
if (!failed.isEmpty) throw failed.head._2
- // read the log configurations from zookeeper
- val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
- dedupeBufferSize = config.logCleanerDedupeBufferSize,
- dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
- ioBufferSize = config.logCleanerIoBufferSize,
- maxMessageSize = config.messageMaxBytes,
- maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
- backOffMs = config.logCleanerBackoffMs,
- enableCleaner = config.logCleanerEnable)
+ val cleanerConfig = LogCleaner.cleanerConfig(config)
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index f307b8d..2c186d3 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -22,6 +22,7 @@ import java.util
import java.util.Properties
import java.util.concurrent.locks.ReentrantReadWriteLock
+import kafka.log.LogCleaner
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -77,6 +78,7 @@ object DynamicBrokerConfig {
val AllDynamicConfigs = mutable.Set[String]()
AllDynamicConfigs ++= DynamicSecurityConfigs
+ AllDynamicConfigs ++= LogCleaner.ReconfigurableConfigs
private val PerBrokerConfigs = DynamicSecurityConfigs
@@ -115,6 +117,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
private val dynamicDefaultConfigs = mutable.Map[String, String]()
private val brokerId = kafkaConfig.brokerId
private val reconfigurables = mutable.Buffer[Reconfigurable]()
+ private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig = kafkaConfig
@@ -124,11 +127,21 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
updateBrokerConfig(brokerId,
adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString))
}
+ def addReconfigurables(kafkaServer: KafkaServer): Unit = {
+ if (kafkaServer.logManager.cleaner != null)
+ addBrokerReconfigurable(kafkaServer.logManager.cleaner)
+ }
+
def addReconfigurable(reconfigurable: Reconfigurable): Unit =
CoreUtils.inWriteLock(lock) {
require(reconfigurable.reconfigurableConfigs.asScala.forall(AllDynamicConfigs.contains))
reconfigurables += reconfigurable
}
+ def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit =
CoreUtils.inWriteLock(lock) {
+
require(reconfigurable.reconfigurableConfigs.forall(AllDynamicConfigs.contains))
+ brokerReconfigurables += reconfigurable
+ }
+
def removeReconfigurable(reconfigurable: Reconfigurable): Unit =
CoreUtils.inWriteLock(lock) {
reconfigurables -= reconfigurable
}
@@ -327,9 +340,15 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
val oldValues =
currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
val newValues =
newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
val updatedKeys = updatedConfigs(newValues, oldValues).keySet
- processReconfigurable(listenerReconfigurable, updatedKeys,
newValues, customConfigs, validateOnly)
+ if
(needsReconfiguration(listenerReconfigurable.reconfigurableConfigs,
updatedKeys))
+ processReconfigurable(listenerReconfigurable, newValues,
customConfigs, validateOnly)
case reconfigurable =>
- processReconfigurable(reconfigurable, updatedMap.keySet,
newConfig.valuesFromThisConfig, customConfigs, validateOnly)
+ if (needsReconfiguration(reconfigurable.reconfigurableConfigs,
updatedMap.keySet))
+ processReconfigurable(reconfigurable,
newConfig.valuesFromThisConfig, customConfigs, validateOnly)
+ }
+ brokerReconfigurables.foreach { reconfigurable =>
+ if
(needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava,
updatedMap.keySet))
+ processBrokerReconfigurable(reconfigurable, currentConfig,
newConfig, validateOnly)
}
newConfig
} catch {
@@ -343,18 +362,41 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
currentConfig
}
- private def processReconfigurable(reconfigurable: Reconfigurable,
updatedKeys: Set[String],
- allNewConfigs: util.Map[String, _],
newCustomConfigs: util.Map[String, Object],
+ private def needsReconfiguration(reconfigurableConfigs: util.Set[String],
updatedKeys: Set[String]): Boolean = {
+ reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty
+ }
+
+ private def processReconfigurable(reconfigurable: Reconfigurable,
+ allNewConfigs: util.Map[String, _],
+ newCustomConfigs: util.Map[String, Object],
validateOnly: Boolean): Unit = {
- if
(reconfigurable.reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty) {
- val newConfigs = new util.HashMap[String, Object]
- allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k,
v.asInstanceOf[AnyRef]) }
- newConfigs.putAll(newCustomConfigs)
- if (validateOnly) {
- if (!reconfigurable.validateReconfiguration(newConfigs))
- throw new ConfigException("Validation of dynamic config update
failed")
- } else
- reconfigurable.reconfigure(newConfigs)
- }
+ val newConfigs = new util.HashMap[String, Object]
+ allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k,
v.asInstanceOf[AnyRef]) }
+ newConfigs.putAll(newCustomConfigs)
+ if (validateOnly) {
+ if (!reconfigurable.validateReconfiguration(newConfigs))
+ throw new ConfigException("Validation of dynamic config update failed")
+ } else
+ reconfigurable.reconfigure(newConfigs)
}
+
+ private def processBrokerReconfigurable(reconfigurable: BrokerReconfigurable,
+ oldConfig: KafkaConfig,
+ newConfig: KafkaConfig,
+ validateOnly: Boolean): Unit = {
+ if (validateOnly) {
+ if (!reconfigurable.validateReconfiguration(newConfig))
+ throw new ConfigException("Validation of dynamic config update failed")
+ } else
+ reconfigurable.reconfigure(oldConfig, newConfig)
+ }
+}
+
+trait BrokerReconfigurable {
+
+ def reconfigurableConfigs: Set[String]
+
+ def validateReconfiguration(newConfig: KafkaConfig): Boolean
+
+ def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 80b0eb7..c4123f1 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -290,6 +290,9 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
Mx4jLoader.maybeLoad()
+ /* Add all reconfigurables for config change notification before
starting config handlers */
+ config.dynamicConfig.addReconfigurables(this)
+
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic ->
new TopicConfigHandler(logManager, config, quotaManagers),
ConfigType.Client
-> new ClientIdConfigHandler(quotaManagers),
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index a760d7d..c6f023f 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -242,6 +242,46 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
stopAndVerifyProduceConsume(producerThread, consumerThread,
mayFailRequests = false)
}
+ @Test
+ def testLogCleanerConfig(): Unit = {
+ val (producerThread, consumerThread) = startProduceConsume(0)
+
+ verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
+
+ val props = new Properties
+ props.put(KafkaConfig.LogCleanerThreadsProp, "2")
+ props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "20000000")
+ props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, "0.8")
+ props.put(KafkaConfig.LogCleanerIoBufferSizeProp, "300000")
+ props.put(KafkaConfig.MessageMaxBytesProp, "40000")
+ props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "50000000")
+ props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000")
+ reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.LogCleanerThreadsProp, "2"))
+
+ // Verify cleaner config was updated
+ val newCleanerConfig = servers.head.logManager.cleaner.currentConfig
+ assertEquals(2, newCleanerConfig.numThreads)
+ assertEquals(20000000, newCleanerConfig.dedupeBufferSize)
+ assertEquals(0.8, newCleanerConfig.dedupeBufferLoadFactor, 0.001)
+ assertEquals(300000, newCleanerConfig.ioBufferSize)
+ assertEquals(40000, newCleanerConfig.maxMessageSize)
+ assertEquals(50000000, newCleanerConfig.maxIoBytesPerSecond, 50000000)
+ assertEquals(6000, newCleanerConfig.backOffMs)
+
+ // Verify thread count
+ verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
+
+ // Stop a couple of threads and verify they are recreated if any config is
updated
+ def cleanerThreads =
Thread.getAllStackTraces.keySet.asScala.filter(_.getName.startsWith("kafka-log-cleaner-thread-"))
+ cleanerThreads.take(2).foreach(_.interrupt())
+ TestUtils.waitUntilTrue(() => cleanerThreads.size == (2 * numServers) - 2,
"Threads did not exit")
+ props.put(KafkaConfig.LogCleanerBackoffMsProp, "8000")
+ reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.LogCleanerBackoffMsProp, "8000"))
+ verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
+
+ stopAndVerifyProduceConsume(producerThread, consumerThread,
mayFailRequests = false)
+ }
+
private def createProducer(trustStore: File, retries: Int,
clientId: String = "test-producer"):
KafkaProducer[String, String] = {
val bootstrapServers = TestUtils.bootstrapServers(servers, new
ListenerName(SecureExternal))
@@ -411,6 +451,19 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
props
}
+ private def currentThreads: List[String] = {
+ Thread.getAllStackTraces.keySet.asScala.toList.map(_.getName)
+ }
+
+ private def verifyThreads(threadPrefix: String, countPerBroker: Int): Unit =
{
+ val expectedCount = countPerBroker * servers.size
+ val (threads, resized) =
TestUtils.computeUntilTrue(currentThreads.filter(_.startsWith(threadPrefix))) {
+ _.size == expectedCount
+ }
+ assertTrue(s"Invalid threads: expected $expectedCount, got
${threads.size}: $threads", resized)
+ }
+
+
private def startProduceConsume(retries: Int): (ProducerThread,
ConsumerThread) = {
val producerThread = new ProducerThread(retries)
clientThreads += producerThread
diff --git
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index bff2700..0ad5b46 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -79,6 +79,7 @@ abstract class AbstractLogCleanerIntegrationTest {
compactionLag: Long = defaultCompactionLag,
deleteDelay: Int = defaultDeleteDelay,
segmentSize: Int = defaultSegmentSize,
+ cleanerIoBufferSize: Option[Int] = None,
propertyOverrides: Properties = new Properties()):
LogCleaner = {
val logMap = new Pool[TopicPartition, Log]()
@@ -108,7 +109,7 @@ abstract class AbstractLogCleanerIntegrationTest {
val cleanerConfig = CleanerConfig(
numThreads = numThreads,
- ioBufferSize = maxMessageSize / 2,
+ ioBufferSize = cleanerIoBufferSize.getOrElse(maxMessageSize / 2),
maxMessageSize = maxMessageSize,
backOffMs = backOffMs)
new LogCleaner(cleanerConfig,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index b20622f..22d7e77 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -18,10 +18,12 @@
package kafka.log
import java.io.File
+import java.util
import java.util.Properties
import kafka.api.KAFKA_0_11_0_IV0
import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
+import kafka.server.KafkaConfig
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import org.apache.kafka.common.TopicPartition
@@ -227,6 +229,56 @@ class LogCleanerIntegrationTest(compressionCodec: String)
extends AbstractLogCle
checkLogAfterAppendingDups(log, startSize, appends)
}
+ @Test
+ def cleanerConfigUpdateTest() {
+ val largeMessageKey = 20
+ val (largeMessageValue, largeMessageSet) =
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
+ val maxMessageSize = largeMessageSet.sizeInBytes
+
+ cleaner = makeCleaner(partitions = topicPartitions, backOffMs = 1,
maxMessageSize = maxMessageSize,
+ cleanerIoBufferSize = Some(1))
+ val log = cleaner.logs.get(topicPartitions(0))
+
+ val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec =
codec)
+ val startSize = log.size
+ cleaner.startup()
+ assertEquals(1, cleaner.cleanerCount)
+
+ // Verify no cleaning with LogCleanerIoBufferSizeProp=1
+ val firstDirty = log.activeSegment.baseOffset
+ val topicPartition = new TopicPartition("log", 0)
+ cleaner.awaitCleaned(topicPartition, firstDirty, maxWaitMs = 10)
+ assertTrue("Should not have cleaned",
cleaner.cleanerManager.allCleanerCheckpoints.isEmpty)
+
+ def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig):
KafkaConfig = {
+ val props = TestUtils.createBrokerConfig(0, "localhost:2181")
+ props.put(KafkaConfig.LogCleanerThreadsProp,
cleanerConfig.numThreads.toString)
+ props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp,
cleanerConfig.dedupeBufferSize.toString)
+ props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp,
cleanerConfig.dedupeBufferLoadFactor.toString)
+ props.put(KafkaConfig.LogCleanerIoBufferSizeProp,
cleanerConfig.ioBufferSize.toString)
+ props.put(KafkaConfig.MessageMaxBytesProp,
cleanerConfig.maxMessageSize.toString)
+ props.put(KafkaConfig.LogCleanerBackoffMsProp,
cleanerConfig.backOffMs.toString)
+ props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp,
cleanerConfig.maxIoBytesPerSecond.toString)
+ KafkaConfig.fromProps(props)
+ }
+
+ // Verify cleaning done with larger LogCleanerIoBufferSizeProp
+ val oldConfig = kafkaConfigWithCleanerConfig(cleaner.currentConfig)
+ val newConfig = kafkaConfigWithCleanerConfig(CleanerConfig(numThreads = 2,
+ dedupeBufferSize = cleaner.currentConfig.dedupeBufferSize,
+ dedupeBufferLoadFactor = cleaner.currentConfig.dedupeBufferLoadFactor,
+ ioBufferSize = 100000,
+ maxMessageSize = cleaner.currentConfig.maxMessageSize,
+ maxIoBytesPerSecond = cleaner.currentConfig.maxIoBytesPerSecond,
+ backOffMs = cleaner.currentConfig.backOffMs))
+ cleaner.reconfigure(oldConfig, newConfig)
+
+ assertEquals(2, cleaner.cleanerCount)
+ checkLastCleaned("log", 0, firstDirty)
+ val compactedSize = log.logSegments.map(_.size).sum
+ assertTrue(s"log should have been compacted: startSize=$startSize
compactedSize=$compactedSize", startSize > compactedSize)
+ }
+
private def checkLastCleaned(topic: String, partitionId: Int, firstDirty:
Long) {
// wait until cleaning up to base_offset, note that cleaning happens only
when "log dirty ratio" is higher than
// LogConfig.MinCleanableDirtyRatioProp
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 2032011..6dedbe0 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -104,23 +104,24 @@ class DynamicBrokerConfigTest {
verifyConfigUpdateWithInvalidConfig(validProps,
securityPropsWithoutListenerPrefix)
val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
verifyConfigUpdateWithInvalidConfig(validProps, nonDynamicProps)
+
+ val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "invalid")
+ verifyConfigUpdateWithInvalidConfig(validProps, invalidProps)
}
@Test
def testSecurityConfigs(): Unit = {
- def verifyUpdate(name: String, value: Object, invalidValue: Boolean): Unit
= {
+ def verifyUpdate(name: String, value: Object): Unit = {
verifyConfigUpdate(name, value, perBrokerConfig = true, expectFailure =
true)
- verifyConfigUpdate(s"listener.name.external.$name", value,
perBrokerConfig = true, expectFailure = invalidValue)
+ verifyConfigUpdate(s"listener.name.external.$name", value,
perBrokerConfig = true, expectFailure = false)
verifyConfigUpdate(name, value, perBrokerConfig = false, expectFailure =
true)
verifyConfigUpdate(s"listener.name.external.$name", value,
perBrokerConfig = false, expectFailure = true)
}
- verifyUpdate(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ks.jks",
invalidValue = false)
- verifyUpdate(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS", invalidValue =
false)
- verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password",
invalidValue = false)
- verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password", invalidValue
= false)
- verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
1.asInstanceOf[Integer], invalidValue = true)
- verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 1.asInstanceOf[Integer],
invalidValue = true)
+ verifyUpdate(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ks.jks")
+ verifyUpdate(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
+ verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password")
+ verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password")
}
private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig:
Boolean, expectFailure: Boolean) {
--
To stop receiving notification emails like this one, please contact
[email protected].