This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9b3722c  KAFKA-6245: Dynamic update of topic config defaults (#4466)
9b3722c is described below

commit 9b3722cea1845784853073d2a22764ec9fd5a8e5
Author: Rajini Sivaram <[email protected]>
AuthorDate: Sat Jan 27 12:22:05 2018 -0800

    KAFKA-6245: Dynamic update of topic config defaults (#4466)
    
    Dynamic update of default topic configs as described in KIP-226.
---
 core/src/main/scala/kafka/cluster/Partition.scala  |   5 +-
 core/src/main/scala/kafka/log/Log.scala            |  13 +-
 core/src/main/scala/kafka/log/LogConfig.scala      |   6 +-
 core/src/main/scala/kafka/log/LogManager.scala     |  29 ++++-
 .../main/scala/kafka/server/ConfigHandler.scala    |  11 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  51 +++++++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  43 ++++---
 .../kafka/api/AdminClientIntegrationTest.scala     |   4 +-
 .../server/DynamicBrokerReconfigurationTest.scala  | 132 ++++++++++++++++++++-
 .../unit/kafka/integration/MinIsrConfigTest.scala  |   2 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   2 +-
 12 files changed, 246 insertions(+), 56 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 8d8c2b3..3b97671 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -172,9 +172,8 @@ class Partition(val topic: String,
     allReplicasMap.getAndMaybePut(replicaId, {
       if (isReplicaLocal(replicaId)) {
         val adminZkClient = new AdminZkClient(zkClient)
-        val prop = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
-        val config = LogConfig.fromProps(logManager.defaultConfig.originals,
-                                         prop)
+        val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
+        val config = 
LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
         val log = logManager.getOrCreateLog(topicPartition, config, isNew, 
replicaId == Request.FutureLocalReplicaId)
         val checkpoint = 
replicaManager.highWatermarkCheckpoints(log.dir.getParent)
         val offsetMap = checkpoint.read()
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 8576913..fec984c 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.requests.{IsolationLevel, 
ListOffsetRequest}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-import scala.collection.{Seq, mutable}
+import scala.collection.{Seq, Set, mutable}
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.utils.{Time, Utils}
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, 
NoCompressionCodec}
@@ -165,6 +165,17 @@ class Log(@volatile var dir: File,
       0
   }
 
+  def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = {
+    if ((updatedKeys.contains(LogConfig.RetentionMsProp)
+      || updatedKeys.contains(LogConfig.MessageTimestampDifferenceMaxMsProp))
+      && topicPartition.partition == 0  // generate warnings only for one 
partition of each topic
+      && newConfig.retentionMs < newConfig.messageTimestampDifferenceMaxMs)
+      warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} is 
set to ${newConfig.retentionMs}. It is smaller than " +
+        s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value 
${newConfig.messageTimestampDifferenceMaxMs}. " +
+        s"This may result in frequent log rolling.")
+    this.config = newConfig
+  }
+
   private def checkIfMemoryMappedBufferClosed(): Unit = {
     if (isMemoryMappedBufferClosed)
       throw new KafkaStorageException(s"The memory mapped buffer for log of 
$topicPartition is already closed")
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index 26cc504..30ca333 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -65,7 +65,8 @@ object Defaults {
   val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots
 }
 
-case class LogConfig(props: java.util.Map[_, _]) extends 
AbstractConfig(LogConfig.configDef, props, false) {
+case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: 
Set[String] = Set.empty)
+  extends AbstractConfig(LogConfig.configDef, props, false) {
   /**
    * Important note: Any configuration parameter that is passed along from 
KafkaConfig to LogConfig
    * should also go in [[kafka.server.KafkaServer.copyKafkaConfigToLog]].
@@ -276,7 +277,8 @@ object LogConfig {
     val props = new Properties()
     defaults.asScala.foreach { case (k, v) => props.put(k, v) }
     props ++= overrides
-    LogConfig(props)
+    val overriddenKeys = 
overrides.keySet.asScala.map(_.asInstanceOf[String]).toSet
+    new LogConfig(props, overriddenKeys)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 37a0be8..adf1b9c 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -50,7 +50,7 @@ import scala.collection.mutable.ArrayBuffer
 class LogManager(logDirs: Seq[File],
                  initialOfflineDirs: Seq[File],
                  val topicConfigs: Map[String, LogConfig], // note that this 
doesn't get updated after creation
-                 val defaultConfig: LogConfig,
+                 val initialDefaultConfig: LogConfig,
                  val cleanerConfig: CleanerConfig,
                  ioThreads: Int,
                  val flushCheckMs: Long,
@@ -78,6 +78,11 @@ class LogManager(logDirs: Seq[File],
   private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
 
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = 
createAndValidateLogDirs(logDirs, initialOfflineDirs)
+  @volatile var currentDefaultConfig = initialDefaultConfig
+
+  def reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = {
+    this.currentDefaultConfig = logConfig
+  }
 
   def liveLogDirs: Seq[File] = {
     if (_liveLogDirs.size == logDirs.size)
@@ -232,7 +237,7 @@ class LogManager(logDirs: Seq[File],
   private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], 
logStartOffsets: Map[TopicPartition, Long]): Unit = {
     debug("Loading log '" + logDir.getName + "'")
     val topicPartition = Log.parseTopicPartitionName(logDir)
-    val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
+    val config = topicConfigs.getOrElse(topicPartition.topic, 
currentDefaultConfig)
     val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
     val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
 
@@ -386,11 +391,10 @@ class LogManager(logDirs: Seq[File],
                          delay = InitialTaskDelayMs,
                          period = flushStartOffsetCheckpointMs,
                          TimeUnit.MILLISECONDS)
-      scheduler.schedule("kafka-delete-logs",
+      scheduler.schedule("kafka-delete-logs", // will be rescheduled after 
each delete logs with a dynamic period
                          deleteLogs _,
                          delay = InitialTaskDelayMs,
-                         period = defaultConfig.fileDeleteDelayMs,
-                         TimeUnit.MILLISECONDS)
+                         unit = TimeUnit.MILLISECONDS)
     }
     if (cleanerConfig.enableCleaner)
       cleaner.startup()
@@ -708,6 +712,19 @@ class LogManager(logDirs: Seq[File],
     } catch {
       case e: Throwable =>
         error(s"Exception in kafka-delete-logs thread.", e)
+    } finally {
+      try {
+        scheduler.schedule("kafka-delete-logs",
+          deleteLogs _,
+          delay = currentDefaultConfig.fileDeleteDelayMs,
+          unit = TimeUnit.MILLISECONDS)
+      } catch {
+        case e: Throwable =>
+          if (scheduler.isStarted) {
+            // No errors should occur unless scheduler has been shutdown
+            error(s"Failed to schedule next delete in kafka-delete-logs 
thread", e)
+          }
+      }
     }
   }
 
@@ -902,7 +919,7 @@ object LogManager {
     new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
       initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
       topicConfigs = topicConfigs,
-      defaultConfig = defaultLogConfig,
+      initialDefaultConfig = defaultLogConfig,
       cleanerConfig = cleanerConfig,
       ioThreads = config.numRecoveryThreadsPerDataDir,
       flushCheckMs = config.logFlushSchedulerIntervalMs,
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 23322c1..7cad118 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -56,18 +56,11 @@ class TopicConfigHandler(private val logManager: 
LogManager, kafkaConfig: KafkaC
     if (logs.nonEmpty) {
       /* combine the default properties with the overrides in zk to create the 
new LogConfig */
       val props = new Properties()
-      props ++= logManager.defaultConfig.originals.asScala
       topicConfig.asScala.foreach { case (key, value) =>
         if (!configNamesToExclude.contains(key)) props.put(key, value)
       }
-      val logConfig = LogConfig(props)
-      if ((topicConfig.containsKey(LogConfig.RetentionMsProp)
-        || 
topicConfig.containsKey(LogConfig.MessageTimestampDifferenceMaxMsProp))
-        && logConfig.retentionMs < logConfig.messageTimestampDifferenceMaxMs)
-        warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to 
${logConfig.retentionMs}. It is smaller than " +
-          s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value 
${logConfig.messageTimestampDifferenceMaxMs}. " +
-          s"This may result in frequent log rolling.")
-      logs.foreach(_.config = logConfig)
+      val logConfig = 
LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
+      logs.foreach(_.updateConfig(topicConfig.asScala.keySet, logConfig))
     }
 
     def updateThrottledList(prop: String, quotaManager: 
ReplicationQuotaManager) = {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 2c186d3..b0dd7c0 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -22,7 +22,7 @@ import java.util
 import java.util.Properties
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import kafka.log.LogCleaner
+import kafka.log.{LogCleaner, LogConfig, LogManager}
 import kafka.server.DynamicBrokerConfig._
 import kafka.utils.{CoreUtils, Logging}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -79,6 +79,7 @@ object DynamicBrokerConfig {
   val AllDynamicConfigs = mutable.Set[String]()
   AllDynamicConfigs ++= DynamicSecurityConfigs
   AllDynamicConfigs ++= LogCleaner.ReconfigurableConfigs
+  AllDynamicConfigs ++= DynamicLogConfig.ReconfigurableConfigs
 
   private val PerBrokerConfigs = DynamicSecurityConfigs
 
@@ -130,6 +131,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   def addReconfigurables(kafkaServer: KafkaServer): Unit = {
     if (kafkaServer.logManager.cleaner != null)
       addBrokerReconfigurable(kafkaServer.logManager.cleaner)
+    addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
   }
 
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
@@ -400,3 +402,50 @@ trait BrokerReconfigurable {
 
   def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
 }
+
+object DynamicLogConfig {
+  // Exclude message.format.version for now since we need to check that the 
version
+  // is supported on all brokers in the cluster.
+  val ExcludedConfigs = Set(KafkaConfig.LogMessageFormatVersionProp)
+
+  val ReconfigurableConfigs = LogConfig.TopicConfigSynonyms.values.toSet -- 
ExcludedConfigs
+  val KafkaConfigToLogConfigName = LogConfig.TopicConfigSynonyms.map { case 
(k, v) => (v, k) }
+}
+class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with 
Logging {
+
+  override def configure(configs: util.Map[String, _]): Unit = {}
+
+  override def reconfigurableConfigs(): util.Set[String] = {
+    DynamicLogConfig.ReconfigurableConfigs.asJava
+  }
+
+  override def validateReconfiguration(configs: util.Map[String, _]): Boolean 
= {
+    // For update of topic config overrides, only config names and types are 
validated
+    // Names and types have already been validated. For consistency with topic 
config
+    // validation, no additional validation is performed.
+    true
+  }
+
+  override def reconfigure(configs: util.Map[String, _]): Unit = {
+    val currentLogConfig = logManager.currentDefaultConfig
+    val newBrokerDefaults = new util.HashMap[String, 
Object](currentLogConfig.originals)
+    
configs.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach
 { case (k, v) =>
+      if (v != null) {
+        DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { 
configName =>
+          newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
+        }
+      }
+    }
+
+    logManager.reconfigureDefaultLogConfig(LogConfig(newBrokerDefaults))
+
+    logManager.allLogs.foreach { log =>
+      val props = mutable.Map.empty[Any, Any]
+      props ++= newBrokerDefaults.asScala
+      props ++= 
log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)
+
+      val logConfig = LogConfig(props.asJava)
+      log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig)
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fba186c..bc78214 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -987,7 +987,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
   val queuedMaxBytes = getLong(KafkaConfig.QueuedMaxBytesProp)
   val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
-  val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
+  def messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
   val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp)
 
   def getNumReplicaAlterLogDirsThreads: Int = {
@@ -1019,42 +1019,41 @@ class KafkaConfig(val props: java.util.Map[_, _], 
doLog: Boolean, dynamicConfigO
   val autoCreateTopicsEnable = 
getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
   val numPartitions = getInt(KafkaConfig.NumPartitionsProp)
   val logDirs = 
CoreUtils.parseCsvList(Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp)))
-  val logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp)
-  val logFlushIntervalMessages = 
getLong(KafkaConfig.LogFlushIntervalMessagesProp)
+  def logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp)
+  def logFlushIntervalMessages = 
getLong(KafkaConfig.LogFlushIntervalMessagesProp)
   val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp)
   val numRecoveryThreadsPerDataDir = 
getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
   val logFlushSchedulerIntervalMs = 
getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
   val logFlushOffsetCheckpointIntervalMs = 
getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
   val logFlushStartOffsetCheckpointIntervalMs = 
getInt(KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp).toLong
   val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp)
-  val logCleanupPolicy = getList(KafkaConfig.LogCleanupPolicyProp)
+  def logCleanupPolicy = getList(KafkaConfig.LogCleanupPolicyProp)
   val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
   val offsetsRetentionCheckIntervalMs = 
getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp)
-  val logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)
+  def logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)
   val logCleanerDedupeBufferSize = 
getLong(KafkaConfig.LogCleanerDedupeBufferSizeProp)
   val logCleanerDedupeBufferLoadFactor = 
getDouble(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp)
   val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp)
   val logCleanerIoMaxBytesPerSecond = 
getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
-  val logCleanerDeleteRetentionMs = 
getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
-  val logCleanerMinCompactionLagMs = 
getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
+  def logCleanerDeleteRetentionMs = 
getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
+  def logCleanerMinCompactionLagMs = 
getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
   val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
-  val logCleanerMinCleanRatio = 
getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
+  def logCleanerMinCleanRatio = 
getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
   val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
-  val logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
-  val logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
-  val logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
-  val logRollTimeMillis: java.lang.Long = 
Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * 
getInt(KafkaConfig.LogRollTimeHoursProp))
-  val logRollTimeJitterMillis: java.lang.Long = 
Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 
1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
-  val logFlushIntervalMs: java.lang.Long = 
Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
-  val logRetentionTimeMillis = getLogRetentionTimeMillis
-  val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
-  val logPreAllocateEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.LogPreAllocateProp)
+  def logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
+  def logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
+  def logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
+  def logRollTimeMillis: java.lang.Long = 
Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * 
getInt(KafkaConfig.LogRollTimeHoursProp))
+  def logRollTimeJitterMillis: java.lang.Long = 
Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 
1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
+  def logFlushIntervalMs: java.lang.Long = 
Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
+  def minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
+  def logPreAllocateEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.LogPreAllocateProp)
   // We keep the user-provided String as `ApiVersion.apply` can choose a 
slightly different version (eg if `0.10.0`
   // is passed, `0.10.0-IV0` may be picked)
   val logMessageFormatVersionString = 
getString(KafkaConfig.LogMessageFormatVersionProp)
   val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
-  val logMessageTimestampType = 
TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
-  val logMessageTimestampDifferenceMaxMs: Long = 
getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
+  def logMessageTimestampType = 
TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
+  def logMessageTimestampDifferenceMaxMs: Long = 
getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
 
   /** ********* Replication configuration ***********/
   val controllerSocketTimeoutMs: Int = 
getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@@ -1075,7 +1074,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   val autoLeaderRebalanceEnable = 
getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
   val leaderImbalancePerBrokerPercentage = 
getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
   val leaderImbalanceCheckIntervalSeconds = 
getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
-  val uncleanLeaderElectionEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
+  def uncleanLeaderElectionEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
 
   val (interBrokerListenerName, interBrokerSecurityProtocol) = 
getInterBrokerListenerNameAndSecurityProtocol
 
@@ -1170,7 +1169,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   val transactionIdExpirationMs = 
getInt(KafkaConfig.TransactionalIdExpirationMsProp)
 
   val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
-  val compressionType = getString(KafkaConfig.CompressionTypeProp)
+  def compressionType = getString(KafkaConfig.CompressionTypeProp)
   val listeners: Seq[EndPoint] = getListeners
   val advertisedListeners: Seq[EndPoint] = getAdvertisedListeners
   private[kafka] lazy val listenerSecurityProtocolMap = 
getListenerSecurityProtocolMap
@@ -1179,7 +1178,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
     dynamicConfig.addReconfigurable(reconfigurable)
   }
 
-  private def getLogRetentionTimeMillis: Long = {
+  def logRetentionTimeMillis: Long = {
     val millisInMinute = 60L * 1000L
     val millisInHour = 60L * millisInMinute
 
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index cac102c..6f45bca 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -442,7 +442,7 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name)
     assertTrue(compressionType.isDefault)
     assertFalse(compressionType.isSensitive)
-    assertTrue(compressionType.isReadOnly)
+    assertFalse(compressionType.isReadOnly)
 
     assertEquals(servers(2).config.values.size, 
configs.get(brokerResource2).entries.size)
     assertEquals(servers(2).config.brokerId.toString, 
configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
@@ -1024,7 +1024,7 @@ object AdminClientIntegrationTest {
     var topicConfigEntries2 = Seq(new 
ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava
 
     val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
servers.head.config.brokerId.toString)
-    val brokerConfigEntries = Seq(new 
ConfigEntry(KafkaConfig.CompressionTypeProp, "gzip")).asJava
+    val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.ZkConnectProp, 
"localhost:2181")).asJava
 
     // Alter configs: first and third are invalid, second is valid
     var alterResult = client.alterConfigs(Map(
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c6f023f..c83a4dc 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -25,20 +25,24 @@ import java.util.{Collections, Properties}
 import java.util.concurrent.{ExecutionException, TimeUnit}
 
 import kafka.api.SaslSetup
+import kafka.log.LogConfig
 import kafka.coordinator.group.OffsetConfig
+import kafka.message.ProducerCompressionCodec
 import kafka.utils.{ShutdownableThread, TestUtils}
 import kafka.utils.Implicits._
 import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
 import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
 import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, 
KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.SslConfigs._
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.errors.{AuthenticationException, 
InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
 import org.junit.Assert._
@@ -88,6 +92,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
       props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
       props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
       props.put(KafkaConfig.SaslEnabledMechanismsProp, 
kafkaServerSaslMechanisms.mkString(","))
+      props.put(KafkaConfig.LogSegmentBytesProp, "2000")
 
       props ++= sslProperties1
       addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal)
@@ -279,6 +284,96 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogCleanerBackoffMsProp, "8000"))
     verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
 
+    // Verify that produce/consume worked throughout this test without any 
retries in producer
+    stopAndVerifyProduceConsume(producerThread, consumerThread, 
mayFailRequests = false)
+  }
+
+  @Test
+  def testDefaultTopicConfig(): Unit = {
+    val (producerThread, consumerThread) = startProduceConsume(retries = 0)
+
+    val props = new Properties
+    props.put(KafkaConfig.LogSegmentBytesProp, "10000")
+    props.put(KafkaConfig.LogRollTimeMillisProp, 
TimeUnit.HOURS.toMillis(2).toString)
+    props.put(KafkaConfig.LogRollTimeJitterMillisProp, 
TimeUnit.HOURS.toMillis(1).toString)
+    props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "100000")
+    props.put(KafkaConfig.LogFlushIntervalMessagesProp, "1000")
+    props.put(KafkaConfig.LogFlushIntervalMsProp, "60000")
+    props.put(KafkaConfig.LogRetentionBytesProp, "10000000")
+    props.put(KafkaConfig.LogRetentionTimeMillisProp, 
TimeUnit.DAYS.toMillis(1).toString)
+    props.put(KafkaConfig.MessageMaxBytesProp, "100000")
+    props.put(KafkaConfig.LogIndexIntervalBytesProp, "10000")
+    props.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, 
TimeUnit.DAYS.toMillis(1).toString)
+    props.put(KafkaConfig.LogCleanerMinCompactionLagMsProp, "60000")
+    props.put(KafkaConfig.LogDeleteDelayMsProp, "60000")
+    props.put(KafkaConfig.LogCleanerMinCleanRatioProp, "0.3")
+    props.put(KafkaConfig.LogCleanupPolicyProp, "delete")
+    props.put(KafkaConfig.UncleanLeaderElectionEnableProp, "false")
+    props.put(KafkaConfig.MinInSyncReplicasProp, "2")
+    props.put(KafkaConfig.CompressionTypeProp, "gzip")
+    props.put(KafkaConfig.LogPreAllocateProp, true.toString)
+    props.put(KafkaConfig.LogMessageTimestampTypeProp, 
TimestampType.LOG_APPEND_TIME.toString)
+    props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
+    reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogSegmentBytesProp, "10000"))
+
+    // Verify that all broker defaults have been updated
+    servers.foreach { server =>
+      props.asScala.foreach { case (k, v) =>
+        assertEquals(s"Not reconfigured $k", 
server.config.originals.get(k).toString, v)
+      }
+    }
+
+    // Verify that configs of existing logs have been updated
+    val newLogConfig = 
LogConfig(KafkaServer.copyKafkaConfigToLog(servers.head.config))
+    assertEquals(newLogConfig, servers.head.logManager.currentDefaultConfig)
+    val log = servers.head.logManager.getLog(new TopicPartition(topic, 
0)).getOrElse(throw new IllegalStateException("Log not found"))
+    TestUtils.waitUntilTrue(() => log.config.segmentSize == 10000, "Existing 
topic config using defaults not updated")
+    props.asScala.foreach { case (k, v) =>
+      val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k)
+      val expectedValue = if (k == KafkaConfig.LogCleanupPolicyProp) s"[$v]" 
else v
+      assertEquals(s"Not reconfigured $logConfigName for existing log", 
expectedValue,
+        log.config.originals.get(logConfigName).toString)
+    }
+    consumerThread.waitForMatchingRecords(record => record.timestampType == 
TimestampType.LOG_APPEND_TIME)
+
+    // Verify that the new config is actually used for new segments of 
existing logs
+    TestUtils.waitUntilTrue(() => log.logSegments.exists(_.size > 9000), "Log 
segment size increase not applied")
+
+    // Verify that overridden topic configs are not updated when broker 
default is updated
+    val log2 = servers.head.logManager.getLog(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))
+      .getOrElse(throw new IllegalStateException("Log not found"))
+    assertFalse("Overridden clean up policy should not be updated", 
log2.config.delete)
+    assertEquals(ProducerCompressionCodec.name, log2.config.compressionType)
+
+    // Verify that we can alter subset of log configs
+    props.clear()
+    props.put(KafkaConfig.LogMessageTimestampTypeProp, 
TimestampType.CREATE_TIME.toString)
+    props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
+    reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.CREATE_TIME.toString))
+    consumerThread.waitForMatchingRecords(record => record.timestampType == 
TimestampType.CREATE_TIME)
+
+    // Verify that even though broker defaults can be defined at default 
cluster level for consistent
+    // configuration across brokers, they can also be defined at per-broker 
level for testing
+    props.clear()
+    props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "500000")
+    alterConfigsOnServer(servers.head, props)
+    assertEquals(500000, 
servers.head.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
+    servers.tail.foreach { server => 
assertEquals(Defaults.LogIndexSizeMaxBytes, 
server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) }
+
+    // Verify that invalid configs are not applied
+    val invalidProps = Map(
+      KafkaConfig.LogMessageTimestampDifferenceMaxMsProp -> "abc", // Invalid 
type
+      KafkaConfig.LogMessageTimestampTypeProp -> "invalid", // Invalid value
+      KafkaConfig.LogRollTimeMillisProp -> "0" // Fails KafkaConfig validation
+    )
+    invalidProps.foreach { case (k, v) =>
+      val newProps = new Properties
+      newProps.putAll(props)
+      props.put(k, v)
+      reconfigureServers(props, perBrokerConfig = false, (k, 
props.getProperty(k)), expectFailure = true)
+    }
+
+    // Verify that produce/consume worked throughout this test without any 
retries in producer
     stopAndVerifyProduceConsume(producerThread, consumerThread, 
mayFailRequests = false)
   }
 
@@ -388,6 +483,14 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     adminClient.alterConfigs(configs)
   }
 
+  private def alterConfigsOnServer(server: KafkaServer, props: Properties): 
Unit = {
+    val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, 
v) }.toList.asJava
+    val newConfig = new Config(configEntries)
+    val configs = Map(new ConfigResource(ConfigResource.Type.BROKER, 
server.config.brokerId.toString) -> newConfig).asJava
+    adminClients.head.alterConfigs(configs).all.get
+    props.asScala.foreach { case (k, v) => waitForConfigOnServer(server, k, v) 
}
+  }
+
   private def reconfigureServers(newProps: Properties, perBrokerConfig: 
Boolean, aPropToVerify: (String, String), expectFailure: Boolean = false): Unit 
= {
     val alterResult = alterConfigs(adminClients.head, newProps, 
perBrokerConfig)
     if (expectFailure) {
@@ -435,10 +538,12 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
   }
 
   private def waitForConfig(propName: String, propValue: String, maxWaitMs: 
Long = 10000): Unit = {
-    servers.foreach { server =>
-      TestUtils.retry(maxWaitMs) {
-        assertEquals(propValue, server.config.originals.get(propName))
-      }
+    servers.foreach { server => waitForConfigOnServer(server, propName, 
propValue, maxWaitMs) }
+  }
+
+  private def waitForConfigOnServer(server: KafkaServer, propName: String, 
propValue: String, maxWaitMs: Long = 10000): Unit = {
+    TestUtils.retry(maxWaitMs) {
+      assertEquals(propValue, server.config.originals.get(propName))
     }
   }
 
@@ -505,20 +610,35 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 
   private class ConsumerThread(producerThread: ProducerThread) extends 
ShutdownableThread("test-consumer", isInterruptible = false) {
     private val consumer = createConsumer("group1", trustStoreFile1)
+    @volatile var lastBatch: ConsumerRecords[String, String] = _
     @volatile private var endTimeMs = Long.MaxValue
     var received = 0
     override def doWork(): Unit = {
       try {
         while (isRunning || (received < producerThread.sent && 
System.currentTimeMillis < endTimeMs)) {
-          received += consumer.poll(50).count
+          val records = consumer.poll(50)
+          received += records.count
+          if (!records.isEmpty)
+            lastBatch = records
         }
       } finally {
         consumer.close()
       }
     }
+
     override def initiateShutdown(): Boolean = {
       endTimeMs = System.currentTimeMillis + 10 * 1000
       super.initiateShutdown()
     }
+
+    def waitForMatchingRecords(predicate: ConsumerRecord[String, String] => 
Boolean): Unit = {
+      TestUtils.waitUntilTrue(() => {
+        val records = lastBatch
+        if (records == null || records.isEmpty)
+          false
+        else
+          records.asScala.toList.exists(predicate)
+      }, "Received records did not match")
+    }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala 
b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
index 455bbde..1f01dc3 100644
--- a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
@@ -31,7 +31,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness {
 
   @Test
   def testDefaultKafkaConfig() {
-    assert(servers.head.getLogManager().defaultConfig.minInSyncReplicas == 5)
+    assert(servers.head.getLogManager().initialDefaultConfig.minInSyncReplicas 
== 5)
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 9533b00..aa8236a 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -181,7 +181,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     adminZkClient.createTopic(topic, 3, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.getOrCreateLog(new TopicPartition(topic, part), 
logManager.defaultConfig)
+    val log = logManager.getOrCreateLog(new TopicPartition(topic, part), 
logManager.initialDefaultConfig)
 
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = 
Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -210,7 +210,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     adminZkClient.createTopic(topic, 3, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.getOrCreateLog(new TopicPartition(topic, part), 
logManager.defaultConfig)
+    val log = logManager.getOrCreateLog(new TopicPartition(topic, part), 
logManager.initialDefaultConfig)
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = 
Integer.toString(42).getBytes()), leaderEpoch = 0)
     log.flush()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 30a10c7..ea94c76 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1022,7 +1022,7 @@ object TestUtils extends Logging {
     new LogManager(logDirs = logDirs,
                    initialOfflineDirs = Array.empty[File],
                    topicConfigs = Map(),
-                   defaultConfig = defaultConfig,
+                   initialDefaultConfig = defaultConfig,
                    cleanerConfig = cleanerConfig,
                    ioThreads = 4,
                    flushCheckMs = 1000L,

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to