This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 57b098c397c KAFKA-17584: Fix incorrect synonym handling for dynamic 
log configurations (#17258)
57b098c397c is described below

commit 57b098c397c441f645a31ec64b9e346fc0f55b6b
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Sep 25 23:16:19 2024 -0700

    KAFKA-17584: Fix incorrect synonym handling for dynamic log configurations 
(#17258)
    
    Several Kafka log configurations in have synonyms. For example, log 
retention can be configured
    either by log.retention.ms, or by log.retention.minutes, or by 
log.retention.hours. There is also
    a faculty in Kafka to dynamically change broker configurations without 
restarting the broker. These
    dynamically set configurations are stored in the metadata log and override 
what is in the broker
    properties file.
    
    Unfortunately, these two features interacted poorly; there was a bug where 
the dynamic log
    configuration update code ignored synonyms. For example, if you set 
log.retention.minutes and then
    reconfigured something unrelated that triggered the LogConfig update path, 
the retention value that
    you had configured was overwritten.
    
    The reason for this was incorrect handling of synonyms. The code tried to 
treat the Kafka broker
    configuration as a bag of key/value entities rather than extracting the 
correct retention time (or
    other setting with overrides) from the KafkaConfig object.
    
    Reviewers: Luke Chen <[email protected]>, Jun Rao <[email protected]>, Kamal 
Chandraprakash<[email protected]>, Christo Lolov 
<[email protected]>, Federico Valeri <[email protected]>, Rajini Sivaram 
<[email protected]>, amangandhi94 <>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  43 +++++----
 .../server/DynamicBrokerReconfigurationTest.scala  |   6 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     | 106 +++++++++++++++------
 3 files changed, 105 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 23c99e5bcb3..2dff381602e 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -30,7 +30,7 @@ import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, SaslConfigs, SslConfigs}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, SaslConfigs, SslConfigs, TopicConfig}
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter}
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
@@ -40,7 +40,7 @@ import 
org.apache.kafka.coordinator.transaction.TransactionLogConfigs
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.security.PasswordEncoder
 import org.apache.kafka.server.ProcessRole
-import org.apache.kafka.server.config.{ConfigType, ServerConfigs, 
ReplicationConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, 
ZooKeeperInternals}
+import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, 
MetricConfigs}
 import org.apache.kafka.server.telemetry.ClientTelemetry
@@ -662,12 +662,24 @@ trait BrokerReconfigurable {
 }
 
 object DynamicLogConfig {
-  // Exclude message.format.version for now since we need to check that the 
version
-  // is supported on all brokers in the cluster.
+  /**
+   * The log configurations that are non-reconfigurable. This set contains the 
names you
+   * would use when setting a dynamic configuration on a topic, which are 
different than the
+   * corresponding broker configuration names.
+   *
+   * For now, message.format.version is not reconfigurable, since we need to 
check that
+   * the version is supported on all brokers in the cluster.
+   */
+  val NonReconfigrableLogConfigs: Set[String] = 
Set(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)
+
+  /**
+   * The broker configurations pertaining to logs that are reconfigurable. 
This set contains
+   * the names you would use when setting a static or dynamic broker 
configuration (not topic
+   * configuration).
+   */
   val ReconfigurableConfigs: Set[String] =
-    ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - 
ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG
-  val KafkaConfigToLogConfigName: Map[String, String] =
-    ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) 
=> (v, k) }
+    ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.
+      filterNot(s => NonReconfigrableLogConfigs.contains(s._1)).values.toSet
 }
 
 class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends 
BrokerReconfigurable with Logging {
@@ -732,17 +744,14 @@ class DynamicLogConfig(logManager: LogManager, server: 
KafkaBroker) extends Brok
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
     val originalLogConfig = logManager.currentDefaultConfig
     val originalUncleanLeaderElectionEnable = 
originalLogConfig.uncleanLeaderElectionEnable
-    val newBrokerDefaults = new util.HashMap[String, 
Object](originalLogConfig.originals)
-    newConfig.valuesFromThisConfig.forEach { (k, v) =>
-      if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
-        DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { 
configName =>
-          if (v == null)
-             newBrokerDefaults.remove(configName)
-          else
-            newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
-        }
+    val newBrokerDefaults = new util.HashMap[String, 
Object](newConfig.extractLogConfigMap)
+    val originalLogConfigMap = originalLogConfig.originals()
+    DynamicLogConfig.NonReconfigrableLogConfigs.foreach(k => {
+      Option(originalLogConfigMap.get(k)) match {
+        case None => newBrokerDefaults.remove(k)
+        case Some(v) => newBrokerDefaults.put(k, v)
       }
-    }
+    })
 
     logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
 
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index acbbd6cfc60..3df10d3cf27 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -63,7 +63,7 @@ import 
org.apache.kafka.common.serialization.{StringDeserializer, StringSerializ
 import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
-import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs, ZkConfigs}
+import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZkConfigs}
 import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.util.ShutdownableThread
@@ -667,8 +667,10 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
 
     val log = servers.head.logManager.getLog(new TopicPartition(topic, 
0)).getOrElse(throw new IllegalStateException("Log not found"))
     TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing 
topic config using defaults not updated")
+    val KafkaConfigToLogConfigName: Map[String, String] =
+      ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, 
v) => (v, k) }
     props.asScala.foreach { case (k, v) =>
-      val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k)
+      val logConfigName = KafkaConfigToLogConfigName(k)
       val expectedValue = if (k == ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG) 
s"[$v]" else v
       assertEquals(expectedValue, 
log.config.originals.get(logConfigName).toString,
         s"Not reconfigured $logConfigName for existing log")
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index e83f5a41a3a..bac6149de5a 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.{lang, util}
 import java.util.{Properties, Map => JMap}
-import java.util.concurrent.CompletionStage
+import java.util.concurrent.{CompletionStage, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import kafka.controller.KafkaController
 import kafka.log.LogManager
@@ -57,7 +57,7 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testConfigUpdate(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     val oldKeystore = "oldKs.jks"
     props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, oldKeystore)
     val config = KafkaConfig(props)
@@ -102,7 +102,7 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testEnableDefaultUncleanLeaderElection(): Unit = {
-    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+    val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
     origProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
"false")
 
     val config = KafkaConfig(origProps)
@@ -133,7 +133,7 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testUpdateDynamicThreadPool(): Unit = {
-    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+    val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
     origProps.put(ServerConfigs.NUM_IO_THREADS_CONFIG, "4")
     origProps.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "2")
     origProps.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "1")
@@ -205,7 +205,7 @@ class DynamicBrokerConfigTest {
   @nowarn("cat=deprecation")
   @Test
   def testConfigUpdateWithSomeInvalidConfigs(): Unit = {
-    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+    val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
     origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
     val config = KafkaConfig(origProps)
     config.dynamicConfig.initialize(None, None)
@@ -227,7 +227,7 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testConfigUpdateWithReconfigurableValidationFailure(): Unit = {
-    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+    val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
     origProps.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 
"100000000")
     val config = KafkaConfig(origProps)
     config.dynamicConfig.initialize(None, None)
@@ -261,7 +261,7 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testReconfigurableValidation(): Unit = {
-    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+    val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
     val config = KafkaConfig(origProps)
     val invalidReconfigurableProps = 
Set(CleanerConfig.LOG_CLEANER_THREADS_PROP, ServerConfigs.BROKER_ID_CONFIG, 
"some.prop")
     val validReconfigurableProps = Set(CleanerConfig.LOG_CLEANER_THREADS_PROP, 
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "some.prop")
@@ -331,7 +331,7 @@ class DynamicBrokerConfigTest {
   }
 
   private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: 
Boolean, expectFailure: Boolean): Unit = {
-    val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+    val configProps = TestUtils.createBrokerConfig(0, null, port = 8181)
     configProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"broker.secret")
     val config = KafkaConfig(configProps)
     config.dynamicConfig.initialize(None, None)
@@ -440,7 +440,7 @@ class DynamicBrokerConfigTest {
   def testDynamicListenerConfig(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
     val oldConfig =  KafkaConfig.fromProps(props)
-    val kafkaServer: KafkaServer = mock(classOf[kafka.server.KafkaServer])
+    val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker])
     when(kafkaServer.config).thenReturn(oldConfig)
 
     props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
@@ -480,11 +480,11 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testAuthorizerConfig(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
+    val props = TestUtils.createBrokerConfig(0, null, port = 9092)
     val oldConfig =  KafkaConfig.fromProps(props)
     oldConfig.dynamicConfig.initialize(None, None)
 
-    val kafkaServer: KafkaServer = mock(classOf[kafka.server.KafkaServer])
+    val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker])
     when(kafkaServer.config).thenReturn(oldConfig)
     
when(kafkaServer.kafkaYammerMetrics).thenReturn(KafkaYammerMetrics.INSTANCE)
     val metrics: Metrics = mock(classOf[Metrics])
@@ -630,7 +630,7 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testImproperConfigsAreRemoved(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    val props = TestUtils.createBrokerConfig(0, null)
     val config = KafkaConfig(props)
     config.dynamicConfig.initialize(None, None)
 
@@ -659,7 +659,7 @@ class DynamicBrokerConfigTest {
   @Test
   def testUpdateMetricReporters(): Unit = {
     val brokerId = 0
-    val origProps = TestUtils.createBrokerConfig(brokerId, 
TestUtils.MockZkConnect, port = 8181)
+    val origProps = TestUtils.createBrokerConfig(brokerId, null, port = 8181)
 
     val config = KafkaConfig(origProps)
     val serverMock = Mockito.mock(classOf[KafkaBroker])
@@ -684,7 +684,7 @@ class DynamicBrokerConfigTest {
   @nowarn("cat=deprecation")
   def testUpdateMetricReportersNoJmxReporter(): Unit = {
     val brokerId = 0
-    val origProps = TestUtils.createBrokerConfig(brokerId, 
TestUtils.MockZkConnect, port = 8181)
+    val origProps = TestUtils.createBrokerConfig(brokerId, null, port = 8181)
     origProps.put(MetricConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false")
 
     val config = KafkaConfig(origProps)
@@ -711,8 +711,7 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testNonInternalValuesDoesNotExposeInternalConfigs(): Unit = {
-    val props = new Properties()
-    props.put(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, "1024")
     val config = new KafkaConfig(props)
     
assertFalse(config.nonInternalValues.containsKey(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG))
@@ -722,10 +721,10 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testDynamicLogLocalRetentionMsConfig(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "2592000000")
     val config = KafkaConfig(props)
-    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaServer]))
+    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaBroker]))
     config.dynamicConfig.initialize(None, None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
 
@@ -745,10 +744,10 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testDynamicLogLocalRetentionSizeConfig(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "4294967296")
     val config = KafkaConfig(props)
-    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaServer]))
+    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaBroker]))
     config.dynamicConfig.initialize(None, None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
 
@@ -768,7 +767,7 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testDynamicLogLocalRetentionSkipsOnInvalidConfig(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000")
     props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024")
     val config = KafkaConfig(props)
@@ -794,7 +793,7 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     val config = KafkaConfig(props)
     val kafkaBroker = mock(classOf[KafkaBroker])
     when(kafkaBroker.config).thenReturn(config)
@@ -828,7 +827,7 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testUpdateDynamicRemoteLogManagerConfig(): Unit = {
-    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
port = 8181)
+    val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
     
origProps.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
 "2")
 
     val config = KafkaConfig(origProps)
@@ -853,9 +852,9 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
+    val props = TestUtils.createBrokerConfig(0, null, port = 9092)
     val config = KafkaConfig.fromProps(props)
-    val serverMock: KafkaServer = mock(classOf[KafkaServer])
+    val serverMock: KafkaBroker = mock(classOf[KafkaBroker])
     val remoteLogManager = mock(classOf[RemoteLogManager])
 
     Mockito.when(serverMock.config).thenReturn(config)
@@ -884,9 +883,9 @@ class DynamicBrokerConfigTest {
 
   @Test
   def testRemoteLogManagerFetchQuotaUpdates(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
+    val props = TestUtils.createBrokerConfig(0, null, port = 9092)
     val config = KafkaConfig.fromProps(props)
-    val serverMock: KafkaServer = mock(classOf[KafkaServer])
+    val serverMock: KafkaBroker = mock(classOf[KafkaBroker])
     val remoteLogManager = mock(classOf[RemoteLogManager])
 
     Mockito.when(serverMock.config).thenReturn(config)
@@ -919,9 +918,9 @@ class DynamicBrokerConfigTest {
     val copyQuotaProp = 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP
     val fetchQuotaProp = 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP
 
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
+    val props = TestUtils.createBrokerConfig(0, null, port = 9092)
     val config = KafkaConfig.fromProps(props)
-    val serverMock: KafkaServer = mock(classOf[KafkaServer])
+    val serverMock: KafkaBroker = mock(classOf[KafkaBroker])
     val remoteLogManager = Mockito.mock(classOf[RemoteLogManager])
 
     Mockito.when(serverMock.config).thenReturn(config)
@@ -969,11 +968,11 @@ class DynamicBrokerConfigTest {
                                             retentionMs: Long,
                                             logLocalRetentionBytes: Long,
                                             retentionBytes: Long): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+    val props = TestUtils.createBrokerConfig(0, null, port = 8181)
     props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 
retentionMs.toString)
     props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, 
retentionBytes.toString)
     val config = KafkaConfig(props)
-    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaServer]))
+    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaBroker]))
     config.dynamicConfig.initialize(None, None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
 
@@ -985,6 +984,51 @@ class DynamicBrokerConfigTest {
     // validate per broker config
     assertThrows(classOf[ConfigException], () =>  
config.dynamicConfig.validate(newProps, perBrokerConfig = true))
   }
+
+  class DynamicLogConfigContext(origProps: Properties) {
+    val config = KafkaConfig(origProps)
+    val serverMock = Mockito.mock(classOf[BrokerServer])
+    val logManagerMock = Mockito.mock(classOf[LogManager])
+
+    Mockito.when(serverMock.config).thenReturn(config)
+    Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
+    Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty)
+
+    val currentDefaultLogConfig = new AtomicReference(new LogConfig(new 
Properties))
+    Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ => 
currentDefaultLogConfig.get())
+    
Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig])))
+      .thenAnswer(invocation => 
currentDefaultLogConfig.set(invocation.getArgument(0)))
+
+    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicLogConfig(logManagerMock, serverMock))
+  }
+
+  @Test
+  def testDynamicLogConfigHandlesSynonymsCorrectly(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
+    origProps.put(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, "1")
+    val ctx = new DynamicLogConfigContext(origProps)
+    assertEquals(TimeUnit.MINUTES.toMillis(1), 
ctx.config.logRetentionTimeMillis)
+
+    val props = new Properties()
+    props.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "12345678")
+    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(TimeUnit.MINUTES.toMillis(1), 
ctx.currentDefaultLogConfig.get().retentionMs)
+  }
+
+  @Test
+  def testLogRetentionTimeMinutesIsNotDynamicallyReconfigurable(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
+    origProps.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, "1")
+    val ctx = new DynamicLogConfigContext(origProps)
+    assertEquals(TimeUnit.HOURS.toMillis(1), ctx.config.logRetentionTimeMillis)
+
+    val props = new Properties()
+    props.put(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, "3")
+    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(TimeUnit.HOURS.toMillis(1), ctx.config.logRetentionTimeMillis)
+    
assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG))
+  }
 }
 
 class TestDynamicThreadPool() extends BrokerReconfigurable {

Reply via email to