This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch KAFKA-17584
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b182d4432451c1f961dcfbb90b84dacf72217efc
Author: Colin P. McCabe <[email protected]>
AuthorDate: Mon Sep 23 16:13:00 2024 -0700

    KAFKA-17584: Fix incorrect synonym handling for dynamic log configurations
    
    Several Kafka log configurations in have synonyms. For example, log 
retention can be configured
    either by log.retention.ms, or by log.retention.minutes, or by 
log.retention.hours. There is also
    a faculty in Kafka to dynamically change broker configurations without 
restarting the broker. These
    dynamically set configurations are stored in the metadata log and override 
what is in the broker
    properties file.
    
    Unfortunately, these two features interacted poorly; there was a bug where 
the dynamic log
    configuration update code ignored synonyms. For example, if you set 
log.retention.minutes and then
    reconfigured something unrelated that triggered the LogConfig update path, 
the retention value that
    you had configured was overwritten.
    
    The reason for this was incorrect handling of synonyms. The code tried to 
treat the Kafka broker
    configuration as a bag of key/value entities rather than extracting the 
correct retention time (or
    other setting with overrides) from the KafkaConfig object.
    
    Separately from the above bug, the code did not honor the value of 
dynamically configured synonyms:
    setting log.retention.minutes had no effect; only log.retention.ms was 
honored.
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 39 +++++++++-------
 .../kafka/server/KRaftClusterTest.scala            | 39 +++++++++++++++-
 .../kafka/server/DynamicBrokerConfigTest.scala     | 53 ++++++++++++++++++----
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 12 +++--
 4 files changed, 111 insertions(+), 32 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index dbae5d28373..598ff949d10 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -662,12 +662,27 @@ trait BrokerReconfigurable {
 }
 
 object DynamicLogConfig {
-  // Exclude message.format.version for now since we need to check that the 
version
-  // is supported on all brokers in the cluster.
-  val ReconfigurableConfigs: Set[String] =
-    ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - 
ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG
-  val KafkaConfigToLogConfigName: Map[String, String] =
-    ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) 
=> (v, k) }
+  val ReconfigurableConfigs: Set[String] = {
+    val results = new util.HashSet[String]
+    ServerTopicConfigSynonyms.ALL_TOPIC_CONFIG_SYNONYMS.values().forEach(v =>
+      v.forEach(configSynonym => results.add(configSynonym.name())))
+
+    // Exclude message.format.version for now since we need to check that the 
version
+    // is supported on all brokers in the cluster.
+    results.remove(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG)
+
+    results.asScala
+  }
+
+  val KafkaConfigToLogConfigName: Map[String, String] = {
+    val results = new util.HashMap[String, String]
+    ServerTopicConfigSynonyms.ALL_TOPIC_CONFIG_SYNONYMS.entrySet().forEach(e 
=> {
+      e.getValue.forEach(configSynonym => {
+        results.put(configSynonym.name(), e.getKey)
+      })
+    })
+    results.asScala
+  }
 }
 
 class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends 
BrokerReconfigurable with Logging {
@@ -732,17 +747,7 @@ class DynamicLogConfig(logManager: LogManager, server: 
KafkaBroker) extends Brok
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
     val originalLogConfig = logManager.currentDefaultConfig
     val originalUncleanLeaderElectionEnable = 
originalLogConfig.uncleanLeaderElectionEnable
-    val newBrokerDefaults = new util.HashMap[String, 
Object](originalLogConfig.originals)
-    newConfig.valuesFromThisConfig.forEach { (k, v) =>
-      if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
-        DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { 
configName =>
-          if (v == null)
-             newBrokerDefaults.remove(configName)
-          else
-            newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
-        }
-      }
-    }
+    val newBrokerDefaults = new util.HashMap[String, 
Object](newConfig.extractLogConfigMap)
 
     logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
 
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 7e74ced9a02..bdce411a13b 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, 
MetadataVersion}
-import org.apache.kafka.server.config.KRaftConfigs
+import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.quota
 import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType}
@@ -64,7 +64,7 @@ import java.util.concurrent.{CompletableFuture, 
CompletionStage, ExecutionExcept
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Collections, Optional, OptionalLong, Properties}
 import scala.annotation.nowarn
-import scala.collection.mutable
+import scala.collection.{Seq, mutable}
 import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
 import scala.jdk.CollectionConverters._
 
@@ -1623,6 +1623,41 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDynamicLogConfigs(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.IBP_3_9_IV0).
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).build()).
+        setConfigProp(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 
"100").
+        setConfigProp(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "10000").
+        build()
+    try {
+      cluster.format()
+      cluster.startup()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        admin.createTopics(util.Arrays.asList(
+          new NewTopic("foo", 1, 1.toShort))).all().get()
+        def numSegments(): Int = {
+          val broker = cluster.brokers().get(0)
+          val log = broker.logManager.getLog(new TopicPartition("foo", 0)).get
+          log.numberOfSegments
+        }
+        do {
+          
TestUtils.generateAndProduceMessages(cluster.brokers().values().asScala.toSeq,
+            "foo", 100, listenerName = ListenerName.normalised("EXTERNAL"))
+        } while (numSegments() < 1)
+        TestUtils.waitUntilTrue(() => numSegments() < 2, "waiting for the 
number of segments to be less than 2.")
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
 }
 
 class BadAuthorizer extends Authorizer {
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 716a361f722..64cdbbf79a2 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.{lang, util}
 import java.util.{Properties, Map => JMap}
-import java.util.concurrent.CompletionStage
+import java.util.concurrent.{CompletionStage, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import kafka.controller.KafkaController
 import kafka.log.LogManager
@@ -440,7 +440,7 @@ class DynamicBrokerConfigTest {
   def testDynamicListenerConfig(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
     val oldConfig =  KafkaConfig.fromProps(props)
-    val kafkaServer: KafkaServer = mock(classOf[kafka.server.KafkaServer])
+    val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker])
     when(kafkaServer.config).thenReturn(oldConfig)
 
     props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
@@ -484,7 +484,7 @@ class DynamicBrokerConfigTest {
     val oldConfig =  KafkaConfig.fromProps(props)
     oldConfig.dynamicConfig.initialize(None, None)
 
-    val kafkaServer: KafkaServer = mock(classOf[kafka.server.KafkaServer])
+    val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker])
     when(kafkaServer.config).thenReturn(oldConfig)
     
when(kafkaServer.kafkaYammerMetrics).thenReturn(KafkaYammerMetrics.INSTANCE)
     val metrics: Metrics = mock(classOf[Metrics])
@@ -725,7 +725,7 @@ class DynamicBrokerConfigTest {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "2592000000")
     val config = KafkaConfig(props)
-    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaServer]))
+    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaBroker]))
     config.dynamicConfig.initialize(None, None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
 
@@ -748,7 +748,7 @@ class DynamicBrokerConfigTest {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
     props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "4294967296")
     val config = KafkaConfig(props)
-    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaServer]))
+    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaBroker]))
     config.dynamicConfig.initialize(None, None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
 
@@ -890,7 +890,7 @@ class DynamicBrokerConfigTest {
   def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
     val config = KafkaConfig.fromProps(props)
-    val serverMock: KafkaServer = mock(classOf[KafkaServer])
+    val serverMock: KafkaBroker = mock(classOf[KafkaBroker])
     val remoteLogManager = mock(classOf[RemoteLogManager])
 
     Mockito.when(serverMock.config).thenReturn(config)
@@ -921,7 +921,7 @@ class DynamicBrokerConfigTest {
   def testRemoteLogManagerFetchQuotaUpdates(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
     val config = KafkaConfig.fromProps(props)
-    val serverMock: KafkaServer = mock(classOf[KafkaServer])
+    val serverMock: KafkaBroker = mock(classOf[KafkaBroker])
     val remoteLogManager = mock(classOf[RemoteLogManager])
 
     Mockito.when(serverMock.config).thenReturn(config)
@@ -956,7 +956,7 @@ class DynamicBrokerConfigTest {
 
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
     val config = KafkaConfig.fromProps(props)
-    val serverMock: KafkaServer = mock(classOf[KafkaServer])
+    val serverMock: KafkaBroker = mock(classOf[KafkaBroker])
     val remoteLogManager = Mockito.mock(classOf[RemoteLogManager])
 
     Mockito.when(serverMock.config).thenReturn(config)
@@ -1008,7 +1008,7 @@ class DynamicBrokerConfigTest {
     props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 
retentionMs.toString)
     props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, 
retentionBytes.toString)
     val config = KafkaConfig(props)
-    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaServer]))
+    val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaBroker]))
     config.dynamicConfig.initialize(None, None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
 
@@ -1020,6 +1020,41 @@ class DynamicBrokerConfigTest {
     // validate per broker config
     assertThrows(classOf[ConfigException], () =>  
config.dynamicConfig.validate(newProps, perBrokerConfig = true))
   }
+
+  @Test
+  def testDynamicLogReconfigurableConfigsIncludesDeprecatedSynonyms(): Unit = {
+    
assertTrue(DynamicLogConfig.ReconfigurableConfigs.contains(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG))
+    
assertTrue(DynamicLogConfig.ReconfigurableConfigs.contains(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG))
+    
assertTrue(DynamicLogConfig.ReconfigurableConfigs.contains(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG))
+  }
+
+  @Test
+  def testDynamicLogConfigHandlesSynonymsCorrectly(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
+    origProps.put(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, "1")
+
+    val config = KafkaConfig(origProps)
+    assertEquals(TimeUnit.MINUTES.toMillis(1), config.logRetentionTimeMillis)
+    val serverMock = Mockito.mock(classOf[BrokerServer])
+    val logManagerMock = Mockito.mock(classOf[LogManager])
+
+    Mockito.when(serverMock.config).thenReturn(config)
+    Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
+    Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty)
+
+    val currentDefaultLogConfig = new AtomicReference(new LogConfig(new 
Properties))
+    Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ => 
currentDefaultLogConfig.get())
+    
Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig])))
+      .thenAnswer(invocation => 
currentDefaultLogConfig.set(invocation.getArgument(0)))
+
+    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicLogConfig(logManagerMock, serverMock))
+
+    val props = new Properties()
+    props.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "12345678")
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(TimeUnit.MINUTES.toMillis(1), 
currentDefaultLogConfig.get().retentionMs)
+  }
 }
 
 class TestDynamicThreadPool() extends BrokerReconfigurable {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f503caf43c4..ff0da0b4ed6 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1267,8 +1267,10 @@ object TestUtils extends Logging {
   def produceMessages[B <: KafkaBroker](
       brokers: Seq[B],
       records: Seq[ProducerRecord[Array[Byte], Array[Byte]]],
-      acks: Int = -1): Unit = {
-    val producer = createProducer(plaintextBootstrapServers(brokers), acks = 
acks)
+      acks: Int = -1,
+      listenerName: ListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    ): Unit = {
+    val producer = createProducer(bootstrapServers(brokers, listenerName), 
acks = acks)
     try {
       val futures = records.map(producer.send)
       futures.foreach(_.get)
@@ -1284,13 +1286,15 @@ object TestUtils extends Logging {
       brokers: Seq[B],
       topic: String,
       numMessages: Int,
-      acks: Int = -1): Seq[String] = {
+      acks: Int = -1,
+      listenerName: ListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    ): Seq[String] = {
     val values = (0 until numMessages).map(x =>  s"test-$x")
     val intSerializer = new IntegerSerializer()
     val records = values.zipWithIndex.map { case (v, i) =>
       new ProducerRecord(topic, intSerializer.serialize(topic, i), v.getBytes)
     }
-    produceMessages(brokers, records, acks)
+    produceMessages(brokers, records, acks, listenerName)
     values
   }
 

Reply via email to