This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new 1c4b02a6fbf KAFKA-17584: Fix incorrect synonym handling for dynamic 
log configurations
1c4b02a6fbf is described below

commit 1c4b02a6fbf0fecdcd0f69656ad8bff1454ad57b
Author: Christo Lolov <[email protected]>
AuthorDate: Mon Nov 25 18:07:29 2024 +0000

    KAFKA-17584: Fix incorrect synonym handling for dynamic log configurations
    
    This is a cherry-pick of #17258 to 3.7.2
    
    This commit differs from the original by using the old (read 3.7) 
references to the configurations and not changing as many unit tests
    
    Reviewers: Divij Vaidya <[email protected]>, Colin Patrick McCabe 
<[email protected]>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 41 ++++++++++++---------
 .../server/DynamicBrokerReconfigurationTest.scala  |  5 ++-
 .../kafka/server/DynamicBrokerConfigTest.scala     | 42 +++++++++++++++++++++-
 3 files changed, 70 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 142349fcfd6..95a531ed89d 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -30,7 +30,7 @@ import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.Reconfigurable
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, SslConfigs}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, SslConfigs, TopicConfig}
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter}
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
@@ -667,13 +667,25 @@ trait BrokerReconfigurable {
 }
 
 object DynamicLogConfig {
-  // Exclude message.format.version for now since we need to check that the 
version
-  // is supported on all brokers in the cluster.
+  /**
+   * The log configurations that are non-reconfigurable. This set contains the 
names you
+   * would use when setting a dynamic configuration on a topic, which are 
different than the
+   * corresponding broker configuration names.
+   *
+   * For now, message.format.version is not reconfigurable, since we need to 
check that
+   * the version is supported on all brokers in the cluster.
+   */
   @nowarn("cat=deprecation")
-  val ExcludedConfigs = Set(KafkaConfig.LogMessageFormatVersionProp)
+  val NonReconfigrableLogConfigs: Set[String] = 
Set(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)
 
-  val ReconfigurableConfigs = 
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet -- 
ExcludedConfigs
-  val KafkaConfigToLogConfigName = 
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => 
(v, k) }
+  /**
+   * The broker configurations pertaining to logs that are reconfigurable. 
This set contains
+   * the names you would use when setting a static or dynamic broker 
configuration (not topic
+   * configuration).
+   */
+  val ReconfigurableConfigs: Set[String] =
+    ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.
+      filterNot(s => NonReconfigrableLogConfigs.contains(s._1)).values.toSet
 }
 
 class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends 
BrokerReconfigurable with Logging {
@@ -738,17 +750,14 @@ class DynamicLogConfig(logManager: LogManager, server: 
KafkaBroker) extends Brok
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
     val originalLogConfig = logManager.currentDefaultConfig
     val originalUncleanLeaderElectionEnable = 
originalLogConfig.uncleanLeaderElectionEnable
-    val newBrokerDefaults = new util.HashMap[String, 
Object](originalLogConfig.originals)
-    newConfig.valuesFromThisConfig.forEach { (k, v) =>
-      if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
-        DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { 
configName =>
-          if (v == null)
-             newBrokerDefaults.remove(configName)
-          else
-            newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
-        }
+    val newBrokerDefaults = new util.HashMap[String, 
Object](newConfig.extractLogConfigMap)
+    val originalLogConfigMap = originalLogConfig.originals()
+    DynamicLogConfig.NonReconfigrableLogConfigs.foreach(k => {
+      Option(originalLogConfigMap.get(k)) match {
+        case None => newBrokerDefaults.remove(k)
+        case Some(v) => newBrokerDefaults.put(k, v)
       }
-    }
+    })
 
     logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
 
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 55714993631..96e1d223a07 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -59,6 +59,7 @@ import org.apache.kafka.common.requests.MetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.util.ShutdownableThread
@@ -663,8 +664,10 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
 
     val log = servers.head.logManager.getLog(new TopicPartition(topic, 
0)).getOrElse(throw new IllegalStateException("Log not found"))
     TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing 
topic config using defaults not updated")
+    val KafkaConfigToLogConfigName: Map[String, String] =
+      ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, 
v) => (v, k) }
     props.asScala.foreach { case (k, v) =>
-      val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k)
+      val logConfigName = KafkaConfigToLogConfigName(k)
       val expectedValue = if (k == KafkaConfig.LogCleanupPolicyProp) s"[$v]" 
else v
       assertEquals(expectedValue, 
log.config.originals.get(logConfigName).toString,
         s"Not reconfigured $logConfigName for existing log")
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index e6537edb025..3ef3c7eb9b2 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.{lang, util}
 import java.util.{Properties, Map => JMap}
-import java.util.concurrent.CompletionStage
+import java.util.concurrent.{CompletionStage, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import kafka.controller.KafkaController
 import kafka.log.LogManager
@@ -833,6 +833,46 @@ class DynamicBrokerConfigTest {
     // validate per broker config
     assertThrows(classOf[ConfigException], () =>  
config.dynamicConfig.validate(newProps, perBrokerConfig = true))
   }
+
+  class DynamicLogConfigContext(origProps: Properties) {
+    val config = KafkaConfig(origProps)
+    val serverMock = Mockito.mock(classOf[BrokerServer])
+    val logManagerMock = Mockito.mock(classOf[LogManager])
+    Mockito.when(serverMock.config).thenReturn(config)
+    Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
+    Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty)
+    val currentDefaultLogConfig = new AtomicReference(new LogConfig(new 
Properties))
+    Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ => 
currentDefaultLogConfig.get())
+    
Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig])))
+      .thenAnswer(invocation => 
currentDefaultLogConfig.set(invocation.getArgument(0)))
+    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicLogConfig(logManagerMock, serverMock))
+  }
+
+  @Test
+  def testDynamicLogConfigHandlesSynonymsCorrectly(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
+    origProps.put(KafkaConfig.LogRetentionTimeMinutesProp, "1")
+    val ctx = new DynamicLogConfigContext(origProps)
+    assertEquals(TimeUnit.MINUTES.toMillis(1), 
ctx.config.logRetentionTimeMillis)
+    val props = new Properties()
+    props.put(KafkaConfig.MessageMaxBytesProp, "12345678")
+    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(TimeUnit.MINUTES.toMillis(1), 
ctx.currentDefaultLogConfig.get().retentionMs)
+  }
+
+  @Test
+  def testLogRetentionTimeMinutesIsNotDynamicallyReconfigurable(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
+    origProps.put(KafkaConfig.LogRetentionTimeHoursProp, "1")
+    val ctx = new DynamicLogConfigContext(origProps)
+    assertEquals(TimeUnit.HOURS.toMillis(1), ctx.config.logRetentionTimeMillis)
+    val props = new Properties()
+    props.put(KafkaConfig.LogRetentionTimeMinutesProp, "3")
+    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(TimeUnit.HOURS.toMillis(1), ctx.config.logRetentionTimeMillis)
+    
assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(KafkaConfig.LogRetentionTimeHoursProp))
+  }
 }
 
 class TestDynamicThreadPool() extends BrokerReconfigurable {

Reply via email to