This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9b3722c KAFKA-6245: Dynamic update of topic config defaults (#4466)
9b3722c is described below
commit 9b3722cea1845784853073d2a22764ec9fd5a8e5
Author: Rajini Sivaram <[email protected]>
AuthorDate: Sat Jan 27 12:22:05 2018 -0800
KAFKA-6245: Dynamic update of topic config defaults (#4466)
Dynamic update of default topic configs as described in KIP-226.
---
core/src/main/scala/kafka/cluster/Partition.scala | 5 +-
core/src/main/scala/kafka/log/Log.scala | 13 +-
core/src/main/scala/kafka/log/LogConfig.scala | 6 +-
core/src/main/scala/kafka/log/LogManager.scala | 29 ++++-
.../main/scala/kafka/server/ConfigHandler.scala | 11 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 51 +++++++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 43 ++++---
.../kafka/api/AdminClientIntegrationTest.scala | 4 +-
.../server/DynamicBrokerReconfigurationTest.scala | 132 ++++++++++++++++++++-
.../unit/kafka/integration/MinIsrConfigTest.scala | 2 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
12 files changed, 246 insertions(+), 56 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 8d8c2b3..3b97671 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -172,9 +172,8 @@ class Partition(val topic: String,
allReplicasMap.getAndMaybePut(replicaId, {
if (isReplicaLocal(replicaId)) {
val adminZkClient = new AdminZkClient(zkClient)
- val prop = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
- val config = LogConfig.fromProps(logManager.defaultConfig.originals,
- prop)
+ val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
+ val config =
LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
val log = logManager.getOrCreateLog(topicPartition, config, isNew,
replicaId == Request.FutureLocalReplicaId)
val checkpoint =
replicaManager.highWatermarkCheckpoints(log.dir.getParent)
val offsetMap = checkpoint.read()
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 8576913..fec984c 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.requests.{IsolationLevel,
ListOffsetRequest}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-import scala.collection.{Seq, mutable}
+import scala.collection.{Seq, Set, mutable}
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.utils.{Time, Utils}
import kafka.message.{BrokerCompressionCodec, CompressionCodec,
NoCompressionCodec}
@@ -165,6 +165,17 @@ class Log(@volatile var dir: File,
0
}
+ def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = {
+ if ((updatedKeys.contains(LogConfig.RetentionMsProp)
+ || updatedKeys.contains(LogConfig.MessageTimestampDifferenceMaxMsProp))
+ && topicPartition.partition == 0 // generate warnings only for one
partition of each topic
+ && newConfig.retentionMs < newConfig.messageTimestampDifferenceMaxMs)
+ warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} is
set to ${newConfig.retentionMs}. It is smaller than " +
+ s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value
${newConfig.messageTimestampDifferenceMaxMs}. " +
+ s"This may result in frequent log rolling.")
+ this.config = newConfig
+ }
+
private def checkIfMemoryMappedBufferClosed(): Unit = {
if (isMemoryMappedBufferClosed)
throw new KafkaStorageException(s"The memory mapped buffer for log of
$topicPartition is already closed")
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala
b/core/src/main/scala/kafka/log/LogConfig.scala
index 26cc504..30ca333 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -65,7 +65,8 @@ object Defaults {
val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots
}
-case class LogConfig(props: java.util.Map[_, _]) extends
AbstractConfig(LogConfig.configDef, props, false) {
+case class LogConfig(props: java.util.Map[_, _], overriddenConfigs:
Set[String] = Set.empty)
+ extends AbstractConfig(LogConfig.configDef, props, false) {
/**
* Important note: Any configuration parameter that is passed along from
KafkaConfig to LogConfig
* should also go in [[kafka.server.KafkaServer.copyKafkaConfigToLog]].
@@ -276,7 +277,8 @@ object LogConfig {
val props = new Properties()
defaults.asScala.foreach { case (k, v) => props.put(k, v) }
props ++= overrides
- LogConfig(props)
+ val overriddenKeys =
overrides.keySet.asScala.map(_.asInstanceOf[String]).toSet
+ new LogConfig(props, overriddenKeys)
}
/**
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 37a0be8..adf1b9c 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -50,7 +50,7 @@ import scala.collection.mutable.ArrayBuffer
class LogManager(logDirs: Seq[File],
initialOfflineDirs: Seq[File],
val topicConfigs: Map[String, LogConfig], // note that this
doesn't get updated after creation
- val defaultConfig: LogConfig,
+ val initialDefaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
@@ -78,6 +78,11 @@ class LogManager(logDirs: Seq[File],
private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
private val _liveLogDirs: ConcurrentLinkedQueue[File] =
createAndValidateLogDirs(logDirs, initialOfflineDirs)
+ @volatile var currentDefaultConfig = initialDefaultConfig
+
+ def reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = {
+ this.currentDefaultConfig = logConfig
+ }
def liveLogDirs: Seq[File] = {
if (_liveLogDirs.size == logDirs.size)
@@ -232,7 +237,7 @@ class LogManager(logDirs: Seq[File],
private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long],
logStartOffsets: Map[TopicPartition, Long]): Unit = {
debug("Loading log '" + logDir.getName + "'")
val topicPartition = Log.parseTopicPartitionName(logDir)
- val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
+ val config = topicConfigs.getOrElse(topicPartition.topic,
currentDefaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
@@ -386,11 +391,10 @@ class LogManager(logDirs: Seq[File],
delay = InitialTaskDelayMs,
period = flushStartOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
- scheduler.schedule("kafka-delete-logs",
+ scheduler.schedule("kafka-delete-logs", // will be rescheduled after
each delete logs with a dynamic period
deleteLogs _,
delay = InitialTaskDelayMs,
- period = defaultConfig.fileDeleteDelayMs,
- TimeUnit.MILLISECONDS)
+ unit = TimeUnit.MILLISECONDS)
}
if (cleanerConfig.enableCleaner)
cleaner.startup()
@@ -708,6 +712,19 @@ class LogManager(logDirs: Seq[File],
} catch {
case e: Throwable =>
error(s"Exception in kafka-delete-logs thread.", e)
+ } finally {
+ try {
+ scheduler.schedule("kafka-delete-logs",
+ deleteLogs _,
+ delay = currentDefaultConfig.fileDeleteDelayMs,
+ unit = TimeUnit.MILLISECONDS)
+ } catch {
+ case e: Throwable =>
+ if (scheduler.isStarted) {
+ // No errors should occur unless scheduler has been shutdown
+ error(s"Failed to schedule next delete in kafka-delete-logs
thread", e)
+ }
+ }
}
}
@@ -902,7 +919,7 @@ object LogManager {
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
topicConfigs = topicConfigs,
- defaultConfig = defaultLogConfig,
+ initialDefaultConfig = defaultLogConfig,
cleanerConfig = cleanerConfig,
ioThreads = config.numRecoveryThreadsPerDataDir,
flushCheckMs = config.logFlushSchedulerIntervalMs,
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 23322c1..7cad118 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -56,18 +56,11 @@ class TopicConfigHandler(private val logManager:
LogManager, kafkaConfig: KafkaC
if (logs.nonEmpty) {
/* combine the default properties with the overrides in zk to create the
new LogConfig */
val props = new Properties()
- props ++= logManager.defaultConfig.originals.asScala
topicConfig.asScala.foreach { case (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
- val logConfig = LogConfig(props)
- if ((topicConfig.containsKey(LogConfig.RetentionMsProp)
- ||
topicConfig.containsKey(LogConfig.MessageTimestampDifferenceMaxMsProp))
- && logConfig.retentionMs < logConfig.messageTimestampDifferenceMaxMs)
- warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to
${logConfig.retentionMs}. It is smaller than " +
- s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value
${logConfig.messageTimestampDifferenceMaxMs}. " +
- s"This may result in frequent log rolling.")
- logs.foreach(_.config = logConfig)
+ val logConfig =
LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
+ logs.foreach(_.updateConfig(topicConfig.asScala.keySet, logConfig))
}
def updateThrottledList(prop: String, quotaManager:
ReplicationQuotaManager) = {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 2c186d3..b0dd7c0 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -22,7 +22,7 @@ import java.util
import java.util.Properties
import java.util.concurrent.locks.ReentrantReadWriteLock
-import kafka.log.LogCleaner
+import kafka.log.{LogCleaner, LogConfig, LogManager}
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -79,6 +79,7 @@ object DynamicBrokerConfig {
val AllDynamicConfigs = mutable.Set[String]()
AllDynamicConfigs ++= DynamicSecurityConfigs
AllDynamicConfigs ++= LogCleaner.ReconfigurableConfigs
+ AllDynamicConfigs ++= DynamicLogConfig.ReconfigurableConfigs
private val PerBrokerConfigs = DynamicSecurityConfigs
@@ -130,6 +131,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
def addReconfigurables(kafkaServer: KafkaServer): Unit = {
if (kafkaServer.logManager.cleaner != null)
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
+ addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
}
def addReconfigurable(reconfigurable: Reconfigurable): Unit =
CoreUtils.inWriteLock(lock) {
@@ -400,3 +402,50 @@ trait BrokerReconfigurable {
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
}
+
+object DynamicLogConfig {
+ // Exclude message.format.version for now since we need to check that the
version
+ // is supported on all brokers in the cluster.
+ val ExcludedConfigs = Set(KafkaConfig.LogMessageFormatVersionProp)
+
+ val ReconfigurableConfigs = LogConfig.TopicConfigSynonyms.values.toSet --
ExcludedConfigs
+ val KafkaConfigToLogConfigName = LogConfig.TopicConfigSynonyms.map { case
(k, v) => (v, k) }
+}
+class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with
Logging {
+
+ override def configure(configs: util.Map[String, _]): Unit = {}
+
+ override def reconfigurableConfigs(): util.Set[String] = {
+ DynamicLogConfig.ReconfigurableConfigs.asJava
+ }
+
+ override def validateReconfiguration(configs: util.Map[String, _]): Boolean
= {
+ // For update of topic config overrides, only config names and types are
validated
+ // Names and types have already been validated. For consistency with topic
config
+ // validation, no additional validation is performed.
+ true
+ }
+
+ override def reconfigure(configs: util.Map[String, _]): Unit = {
+ val currentLogConfig = logManager.currentDefaultConfig
+ val newBrokerDefaults = new util.HashMap[String,
Object](currentLogConfig.originals)
+
configs.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach
{ case (k, v) =>
+ if (v != null) {
+ DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach {
configName =>
+ newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
+ }
+ }
+ }
+
+ logManager.reconfigureDefaultLogConfig(LogConfig(newBrokerDefaults))
+
+ logManager.allLogs.foreach { log =>
+ val props = mutable.Map.empty[Any, Any]
+ props ++= newBrokerDefaults.asScala
+ props ++=
log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)
+
+ val logConfig = LogConfig(props.asJava)
+ log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig)
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fba186c..bc78214 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -987,7 +987,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
val queuedMaxBytes = getLong(KafkaConfig.QueuedMaxBytesProp)
val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
- val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
+ def messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp)
def getNumReplicaAlterLogDirsThreads: Int = {
@@ -1019,42 +1019,41 @@ class KafkaConfig(val props: java.util.Map[_, _],
doLog: Boolean, dynamicConfigO
val autoCreateTopicsEnable =
getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
val numPartitions = getInt(KafkaConfig.NumPartitionsProp)
val logDirs =
CoreUtils.parseCsvList(Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp)))
- val logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp)
- val logFlushIntervalMessages =
getLong(KafkaConfig.LogFlushIntervalMessagesProp)
+ def logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp)
+ def logFlushIntervalMessages =
getLong(KafkaConfig.LogFlushIntervalMessagesProp)
val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp)
val numRecoveryThreadsPerDataDir =
getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
val logFlushSchedulerIntervalMs =
getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
val logFlushOffsetCheckpointIntervalMs =
getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
val logFlushStartOffsetCheckpointIntervalMs =
getInt(KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp).toLong
val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp)
- val logCleanupPolicy = getList(KafkaConfig.LogCleanupPolicyProp)
+ def logCleanupPolicy = getList(KafkaConfig.LogCleanupPolicyProp)
val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
val offsetsRetentionCheckIntervalMs =
getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp)
- val logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)
+ def logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)
val logCleanerDedupeBufferSize =
getLong(KafkaConfig.LogCleanerDedupeBufferSizeProp)
val logCleanerDedupeBufferLoadFactor =
getDouble(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp)
val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp)
val logCleanerIoMaxBytesPerSecond =
getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
- val logCleanerDeleteRetentionMs =
getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
- val logCleanerMinCompactionLagMs =
getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
+ def logCleanerDeleteRetentionMs =
getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
+ def logCleanerMinCompactionLagMs =
getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
- val logCleanerMinCleanRatio =
getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
+ def logCleanerMinCleanRatio =
getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
- val logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
- val logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
- val logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
- val logRollTimeMillis: java.lang.Long =
Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L *
getInt(KafkaConfig.LogRollTimeHoursProp))
- val logRollTimeJitterMillis: java.lang.Long =
Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 *
1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
- val logFlushIntervalMs: java.lang.Long =
Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
- val logRetentionTimeMillis = getLogRetentionTimeMillis
- val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
- val logPreAllocateEnable: java.lang.Boolean =
getBoolean(KafkaConfig.LogPreAllocateProp)
+ def logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
+ def logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
+ def logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
+ def logRollTimeMillis: java.lang.Long =
Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L *
getInt(KafkaConfig.LogRollTimeHoursProp))
+ def logRollTimeJitterMillis: java.lang.Long =
Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 *
1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
+ def logFlushIntervalMs: java.lang.Long =
Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
+ def minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
+ def logPreAllocateEnable: java.lang.Boolean =
getBoolean(KafkaConfig.LogPreAllocateProp)
// We keep the user-provided String as `ApiVersion.apply` can choose a
slightly different version (eg if `0.10.0`
// is passed, `0.10.0-IV0` may be picked)
val logMessageFormatVersionString =
getString(KafkaConfig.LogMessageFormatVersionProp)
val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
- val logMessageTimestampType =
TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
- val logMessageTimestampDifferenceMaxMs: Long =
getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
+ def logMessageTimestampType =
TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
+ def logMessageTimestampDifferenceMaxMs: Long =
getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int =
getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@@ -1075,7 +1074,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
val autoLeaderRebalanceEnable =
getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
val leaderImbalancePerBrokerPercentage =
getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
val leaderImbalanceCheckIntervalSeconds =
getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
- val uncleanLeaderElectionEnable: java.lang.Boolean =
getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
+ def uncleanLeaderElectionEnable: java.lang.Boolean =
getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
val (interBrokerListenerName, interBrokerSecurityProtocol) =
getInterBrokerListenerNameAndSecurityProtocol
@@ -1170,7 +1169,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
val transactionIdExpirationMs =
getInt(KafkaConfig.TransactionalIdExpirationMsProp)
val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
- val compressionType = getString(KafkaConfig.CompressionTypeProp)
+ def compressionType = getString(KafkaConfig.CompressionTypeProp)
val listeners: Seq[EndPoint] = getListeners
val advertisedListeners: Seq[EndPoint] = getAdvertisedListeners
private[kafka] lazy val listenerSecurityProtocolMap =
getListenerSecurityProtocolMap
@@ -1179,7 +1178,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
dynamicConfig.addReconfigurable(reconfigurable)
}
- private def getLogRetentionTimeMillis: Long = {
+ def logRetentionTimeMillis: Long = {
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index cac102c..6f45bca 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -442,7 +442,7 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name)
assertTrue(compressionType.isDefault)
assertFalse(compressionType.isSensitive)
- assertTrue(compressionType.isReadOnly)
+ assertFalse(compressionType.isReadOnly)
assertEquals(servers(2).config.values.size,
configs.get(brokerResource2).entries.size)
assertEquals(servers(2).config.brokerId.toString,
configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
@@ -1024,7 +1024,7 @@ object AdminClientIntegrationTest {
var topicConfigEntries2 = Seq(new
ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER,
servers.head.config.brokerId.toString)
- val brokerConfigEntries = Seq(new
ConfigEntry(KafkaConfig.CompressionTypeProp, "gzip")).asJava
+ val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.ZkConnectProp,
"localhost:2181")).asJava
// Alter configs: first and third are invalid, second is valid
var alterResult = client.alterConfigs(Map(
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c6f023f..c83a4dc 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -25,20 +25,24 @@ import java.util.{Collections, Properties}
import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.api.SaslSetup
+import kafka.log.LogConfig
import kafka.coordinator.group.OffsetConfig
+import kafka.message.ProducerCompressionCodec
import kafka.utils.{ShutdownableThread, TestUtils}
import kafka.utils.Implicits._
import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords,
KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.SslConfigs._
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.{AuthenticationException,
InvalidRequestException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{StringDeserializer,
StringSerializer}
import org.junit.Assert._
@@ -88,6 +92,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
props.put(KafkaConfig.SaslEnabledMechanismsProp,
kafkaServerSaslMechanisms.mkString(","))
+ props.put(KafkaConfig.LogSegmentBytesProp, "2000")
props ++= sslProperties1
addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal)
@@ -279,6 +284,96 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.LogCleanerBackoffMsProp, "8000"))
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
+ // Verify that produce/consume worked throughout this test without any
retries in producer
+ stopAndVerifyProduceConsume(producerThread, consumerThread,
mayFailRequests = false)
+ }
+
+ @Test
+ def testDefaultTopicConfig(): Unit = {
+ val (producerThread, consumerThread) = startProduceConsume(retries = 0)
+
+ val props = new Properties
+ props.put(KafkaConfig.LogSegmentBytesProp, "10000")
+ props.put(KafkaConfig.LogRollTimeMillisProp,
TimeUnit.HOURS.toMillis(2).toString)
+ props.put(KafkaConfig.LogRollTimeJitterMillisProp,
TimeUnit.HOURS.toMillis(1).toString)
+ props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "100000")
+ props.put(KafkaConfig.LogFlushIntervalMessagesProp, "1000")
+ props.put(KafkaConfig.LogFlushIntervalMsProp, "60000")
+ props.put(KafkaConfig.LogRetentionBytesProp, "10000000")
+ props.put(KafkaConfig.LogRetentionTimeMillisProp,
TimeUnit.DAYS.toMillis(1).toString)
+ props.put(KafkaConfig.MessageMaxBytesProp, "100000")
+ props.put(KafkaConfig.LogIndexIntervalBytesProp, "10000")
+ props.put(KafkaConfig.LogCleanerDeleteRetentionMsProp,
TimeUnit.DAYS.toMillis(1).toString)
+ props.put(KafkaConfig.LogCleanerMinCompactionLagMsProp, "60000")
+ props.put(KafkaConfig.LogDeleteDelayMsProp, "60000")
+ props.put(KafkaConfig.LogCleanerMinCleanRatioProp, "0.3")
+ props.put(KafkaConfig.LogCleanupPolicyProp, "delete")
+ props.put(KafkaConfig.UncleanLeaderElectionEnableProp, "false")
+ props.put(KafkaConfig.MinInSyncReplicasProp, "2")
+ props.put(KafkaConfig.CompressionTypeProp, "gzip")
+ props.put(KafkaConfig.LogPreAllocateProp, true.toString)
+ props.put(KafkaConfig.LogMessageTimestampTypeProp,
TimestampType.LOG_APPEND_TIME.toString)
+ props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
+ reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.LogSegmentBytesProp, "10000"))
+
+ // Verify that all broker defaults have been updated
+ servers.foreach { server =>
+ props.asScala.foreach { case (k, v) =>
+ assertEquals(s"Not reconfigured $k",
server.config.originals.get(k).toString, v)
+ }
+ }
+
+ // Verify that configs of existing logs have been updated
+ val newLogConfig =
LogConfig(KafkaServer.copyKafkaConfigToLog(servers.head.config))
+ assertEquals(newLogConfig, servers.head.logManager.currentDefaultConfig)
+ val log = servers.head.logManager.getLog(new TopicPartition(topic,
0)).getOrElse(throw new IllegalStateException("Log not found"))
+ TestUtils.waitUntilTrue(() => log.config.segmentSize == 10000, "Existing
topic config using defaults not updated")
+ props.asScala.foreach { case (k, v) =>
+ val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k)
+ val expectedValue = if (k == KafkaConfig.LogCleanupPolicyProp) s"[$v]"
else v
+ assertEquals(s"Not reconfigured $logConfigName for existing log",
expectedValue,
+ log.config.originals.get(logConfigName).toString)
+ }
+ consumerThread.waitForMatchingRecords(record => record.timestampType ==
TimestampType.LOG_APPEND_TIME)
+
+ // Verify that the new config is actually used for new segments of
existing logs
+ TestUtils.waitUntilTrue(() => log.logSegments.exists(_.size > 9000), "Log
segment size increase not applied")
+
+ // Verify that overridden topic configs are not updated when broker
default is updated
+ val log2 = servers.head.logManager.getLog(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))
+ .getOrElse(throw new IllegalStateException("Log not found"))
+ assertFalse("Overridden clean up policy should not be updated",
log2.config.delete)
+ assertEquals(ProducerCompressionCodec.name, log2.config.compressionType)
+
+ // Verify that we can alter subset of log configs
+ props.clear()
+ props.put(KafkaConfig.LogMessageTimestampTypeProp,
TimestampType.CREATE_TIME.toString)
+ props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
+ reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.CREATE_TIME.toString))
+ consumerThread.waitForMatchingRecords(record => record.timestampType ==
TimestampType.CREATE_TIME)
+
+ // Verify that even though broker defaults can be defined at default
cluster level for consistent
+ // configuration across brokers, they can also be defined at per-broker
level for testing
+ props.clear()
+ props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "500000")
+ alterConfigsOnServer(servers.head, props)
+ assertEquals(500000,
servers.head.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
+ servers.tail.foreach { server =>
assertEquals(Defaults.LogIndexSizeMaxBytes,
server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) }
+
+ // Verify that invalid configs are not applied
+ val invalidProps = Map(
+ KafkaConfig.LogMessageTimestampDifferenceMaxMsProp -> "abc", // Invalid
type
+ KafkaConfig.LogMessageTimestampTypeProp -> "invalid", // Invalid value
+ KafkaConfig.LogRollTimeMillisProp -> "0" // Fails KafkaConfig validation
+ )
+ invalidProps.foreach { case (k, v) =>
+ val newProps = new Properties
+ newProps.putAll(props)
+ props.put(k, v)
+ reconfigureServers(props, perBrokerConfig = false, (k,
props.getProperty(k)), expectFailure = true)
+ }
+
+ // Verify that produce/consume worked throughout this test without any
retries in producer
stopAndVerifyProduceConsume(producerThread, consumerThread,
mayFailRequests = false)
}
@@ -388,6 +483,14 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
adminClient.alterConfigs(configs)
}
+ private def alterConfigsOnServer(server: KafkaServer, props: Properties):
Unit = {
+ val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k,
v) }.toList.asJava
+ val newConfig = new Config(configEntries)
+ val configs = Map(new ConfigResource(ConfigResource.Type.BROKER,
server.config.brokerId.toString) -> newConfig).asJava
+ adminClients.head.alterConfigs(configs).all.get
+ props.asScala.foreach { case (k, v) => waitForConfigOnServer(server, k, v)
}
+ }
+
private def reconfigureServers(newProps: Properties, perBrokerConfig:
Boolean, aPropToVerify: (String, String), expectFailure: Boolean = false): Unit
= {
val alterResult = alterConfigs(adminClients.head, newProps,
perBrokerConfig)
if (expectFailure) {
@@ -435,10 +538,12 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
}
private def waitForConfig(propName: String, propValue: String, maxWaitMs:
Long = 10000): Unit = {
- servers.foreach { server =>
- TestUtils.retry(maxWaitMs) {
- assertEquals(propValue, server.config.originals.get(propName))
- }
+ servers.foreach { server => waitForConfigOnServer(server, propName,
propValue, maxWaitMs) }
+ }
+
+ private def waitForConfigOnServer(server: KafkaServer, propName: String,
propValue: String, maxWaitMs: Long = 10000): Unit = {
+ TestUtils.retry(maxWaitMs) {
+ assertEquals(propValue, server.config.originals.get(propName))
}
}
@@ -505,20 +610,35 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
private class ConsumerThread(producerThread: ProducerThread) extends
ShutdownableThread("test-consumer", isInterruptible = false) {
private val consumer = createConsumer("group1", trustStoreFile1)
+ @volatile var lastBatch: ConsumerRecords[String, String] = _
@volatile private var endTimeMs = Long.MaxValue
var received = 0
override def doWork(): Unit = {
try {
while (isRunning || (received < producerThread.sent &&
System.currentTimeMillis < endTimeMs)) {
- received += consumer.poll(50).count
+ val records = consumer.poll(50)
+ received += records.count
+ if (!records.isEmpty)
+ lastBatch = records
}
} finally {
consumer.close()
}
}
+
override def initiateShutdown(): Boolean = {
endTimeMs = System.currentTimeMillis + 10 * 1000
super.initiateShutdown()
}
+
+ def waitForMatchingRecords(predicate: ConsumerRecord[String, String] =>
Boolean): Unit = {
+ TestUtils.waitUntilTrue(() => {
+ val records = lastBatch
+ if (records == null || records.isEmpty)
+ false
+ else
+ records.asScala.toList.exists(predicate)
+ }, "Received records did not match")
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
index 455bbde..1f01dc3 100644
--- a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
@@ -31,7 +31,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness {
@Test
def testDefaultKafkaConfig() {
- assert(servers.head.getLogManager().defaultConfig.minInSyncReplicas == 5)
+ assert(servers.head.getLogManager().initialDefaultConfig.minInSyncReplicas
== 5)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 9533b00..aa8236a 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -181,7 +181,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
adminZkClient.createTopic(topic, 3, 1)
val logManager = server.getLogManager
- val log = logManager.getOrCreateLog(new TopicPartition(topic, part),
logManager.defaultConfig)
+ val log = logManager.getOrCreateLog(new TopicPartition(topic, part),
logManager.initialDefaultConfig)
for (_ <- 0 until 20)
log.appendAsLeader(TestUtils.singletonRecords(value =
Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -210,7 +210,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
adminZkClient.createTopic(topic, 3, 1)
val logManager = server.getLogManager
- val log = logManager.getOrCreateLog(new TopicPartition(topic, part),
logManager.defaultConfig)
+ val log = logManager.getOrCreateLog(new TopicPartition(topic, part),
logManager.initialDefaultConfig)
for (_ <- 0 until 20)
log.appendAsLeader(TestUtils.singletonRecords(value =
Integer.toString(42).getBytes()), leaderEpoch = 0)
log.flush()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 30a10c7..ea94c76 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1022,7 +1022,7 @@ object TestUtils extends Logging {
new LogManager(logDirs = logDirs,
initialOfflineDirs = Array.empty[File],
topicConfigs = Map(),
- defaultConfig = defaultConfig,
+ initialDefaultConfig = defaultConfig,
cleanerConfig = cleanerConfig,
ioThreads = 4,
flushCheckMs = 1000L,
--
To stop receiving notification emails like this one, please contact
[email protected].