This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch KAFKA-17584 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit b182d4432451c1f961dcfbb90b84dacf72217efc Author: Colin P. McCabe <[email protected]> AuthorDate: Mon Sep 23 16:13:00 2024 -0700 KAFKA-17584: Fix incorrect synonym handling for dynamic log configurations Several Kafka log configurations in have synonyms. For example, log retention can be configured either by log.retention.ms, or by log.retention.minutes, or by log.retention.hours. There is also a faculty in Kafka to dynamically change broker configurations without restarting the broker. These dynamically set configurations are stored in the metadata log and override what is in the broker properties file. Unfortunately, these two features interacted poorly; there was a bug where the dynamic log configuration update code ignored synonyms. For example, if you set log.retention.minutes and then reconfigured something unrelated that triggered the LogConfig update path, the retention value that you had configured was overwritten. The reason for this was incorrect handling of synonyms. The code tried to treat the Kafka broker configuration as a bag of key/value entities rather than extracting the correct retention time (or other setting with overrides) from the KafkaConfig object. Separately from the above bug, the code did not honor the value of dynamically configured synonyms: setting log.retention.minutes had no effect; only log.retention.ms was honored. --- .../scala/kafka/server/DynamicBrokerConfig.scala | 39 +++++++++------- .../kafka/server/KRaftClusterTest.scala | 39 +++++++++++++++- .../kafka/server/DynamicBrokerConfigTest.scala | 53 ++++++++++++++++++---- .../test/scala/unit/kafka/utils/TestUtils.scala | 12 +++-- 4 files changed, 111 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index dbae5d28373..598ff949d10 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -662,12 +662,27 @@ trait BrokerReconfigurable { } object DynamicLogConfig { - // Exclude message.format.version for now since we need to check that the version - // is supported on all brokers in the cluster. - val ReconfigurableConfigs: Set[String] = - ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG - val KafkaConfigToLogConfigName: Map[String, String] = - ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) } + val ReconfigurableConfigs: Set[String] = { + val results = new util.HashSet[String] + ServerTopicConfigSynonyms.ALL_TOPIC_CONFIG_SYNONYMS.values().forEach(v => + v.forEach(configSynonym => results.add(configSynonym.name()))) + + // Exclude message.format.version for now since we need to check that the version + // is supported on all brokers in the cluster. + results.remove(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG) + + results.asScala + } + + val KafkaConfigToLogConfigName: Map[String, String] = { + val results = new util.HashMap[String, String] + ServerTopicConfigSynonyms.ALL_TOPIC_CONFIG_SYNONYMS.entrySet().forEach(e => { + e.getValue.forEach(configSynonym => { + results.put(configSynonym.name(), e.getKey) + }) + }) + results.asScala + } } class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends BrokerReconfigurable with Logging { @@ -732,17 +747,7 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { val originalLogConfig = logManager.currentDefaultConfig val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable - val newBrokerDefaults = new util.HashMap[String, Object](originalLogConfig.originals) - newConfig.valuesFromThisConfig.forEach { (k, v) => - if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) { - DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName => - if (v == null) - newBrokerDefaults.remove(configName) - else - newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef]) - } - } - } + val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap) logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults)) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 7e74ced9a02..bdce411a13b 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -46,7 +46,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, MetadataVersion} -import org.apache.kafka.server.config.KRaftConfigs +import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.quota import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType} @@ -64,7 +64,7 @@ import java.util.concurrent.{CompletableFuture, CompletionStage, ExecutionExcept import java.util.concurrent.atomic.AtomicInteger import java.util.{Collections, Optional, OptionalLong, Properties} import scala.annotation.nowarn -import scala.collection.mutable +import scala.collection.{Seq, mutable} import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS} import scala.jdk.CollectionConverters._ @@ -1623,6 +1623,41 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testDynamicLogConfigs(): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setBootstrapMetadataVersion(MetadataVersion.IBP_3_9_IV0). + setNumBrokerNodes(1). + setNumControllerNodes(1).build()). + setConfigProp(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "100"). + setConfigProp(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "10000"). + build() + try { + cluster.format() + cluster.startup() + val admin = Admin.create(cluster.clientProperties()) + try { + admin.createTopics(util.Arrays.asList( + new NewTopic("foo", 1, 1.toShort))).all().get() + def numSegments(): Int = { + val broker = cluster.brokers().get(0) + val log = broker.logManager.getLog(new TopicPartition("foo", 0)).get + log.numberOfSegments + } + do { + TestUtils.generateAndProduceMessages(cluster.brokers().values().asScala.toSeq, + "foo", 100, listenerName = ListenerName.normalised("EXTERNAL")) + } while (numSegments() < 1) + TestUtils.waitUntilTrue(() => numSegments() < 2, "waiting for the number of segments to be less than 2.") + } finally { + admin.close() + } + } finally { + cluster.close() + } + } } class BadAuthorizer extends Authorizer { diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 716a361f722..64cdbbf79a2 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -19,7 +19,7 @@ package kafka.server import java.{lang, util} import java.util.{Properties, Map => JMap} -import java.util.concurrent.CompletionStage +import java.util.concurrent.{CompletionStage, TimeUnit} import java.util.concurrent.atomic.AtomicReference import kafka.controller.KafkaController import kafka.log.LogManager @@ -440,7 +440,7 @@ class DynamicBrokerConfigTest { def testDynamicListenerConfig(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) val oldConfig = KafkaConfig.fromProps(props) - val kafkaServer: KafkaServer = mock(classOf[kafka.server.KafkaServer]) + val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker]) when(kafkaServer.config).thenReturn(oldConfig) props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093") @@ -484,7 +484,7 @@ class DynamicBrokerConfigTest { val oldConfig = KafkaConfig.fromProps(props) oldConfig.dynamicConfig.initialize(None, None) - val kafkaServer: KafkaServer = mock(classOf[kafka.server.KafkaServer]) + val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker]) when(kafkaServer.config).thenReturn(oldConfig) when(kafkaServer.kafkaYammerMetrics).thenReturn(KafkaYammerMetrics.INSTANCE) val metrics: Metrics = mock(classOf[Metrics]) @@ -725,7 +725,7 @@ class DynamicBrokerConfigTest { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "2592000000") val config = KafkaConfig(props) - val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaServer])) + val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker])) config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) @@ -748,7 +748,7 @@ class DynamicBrokerConfigTest { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "4294967296") val config = KafkaConfig(props) - val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaServer])) + val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker])) config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) @@ -890,7 +890,7 @@ class DynamicBrokerConfigTest { def testRemoteLogManagerCopyQuotaUpdates(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) val config = KafkaConfig.fromProps(props) - val serverMock: KafkaServer = mock(classOf[KafkaServer]) + val serverMock: KafkaBroker = mock(classOf[KafkaBroker]) val remoteLogManager = mock(classOf[RemoteLogManager]) Mockito.when(serverMock.config).thenReturn(config) @@ -921,7 +921,7 @@ class DynamicBrokerConfigTest { def testRemoteLogManagerFetchQuotaUpdates(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) val config = KafkaConfig.fromProps(props) - val serverMock: KafkaServer = mock(classOf[KafkaServer]) + val serverMock: KafkaBroker = mock(classOf[KafkaBroker]) val remoteLogManager = mock(classOf[RemoteLogManager]) Mockito.when(serverMock.config).thenReturn(config) @@ -956,7 +956,7 @@ class DynamicBrokerConfigTest { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) val config = KafkaConfig.fromProps(props) - val serverMock: KafkaServer = mock(classOf[KafkaServer]) + val serverMock: KafkaBroker = mock(classOf[KafkaBroker]) val remoteLogManager = Mockito.mock(classOf[RemoteLogManager]) Mockito.when(serverMock.config).thenReturn(config) @@ -1008,7 +1008,7 @@ class DynamicBrokerConfigTest { props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, retentionMs.toString) props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, retentionBytes.toString) val config = KafkaConfig(props) - val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaServer])) + val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker])) config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) @@ -1020,6 +1020,41 @@ class DynamicBrokerConfigTest { // validate per broker config assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) } + + @Test + def testDynamicLogReconfigurableConfigsIncludesDeprecatedSynonyms(): Unit = { + assertTrue(DynamicLogConfig.ReconfigurableConfigs.contains(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG)) + assertTrue(DynamicLogConfig.ReconfigurableConfigs.contains(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG)) + assertTrue(DynamicLogConfig.ReconfigurableConfigs.contains(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG)) + } + + @Test + def testDynamicLogConfigHandlesSynonymsCorrectly(): Unit = { + val origProps = TestUtils.createBrokerConfig(0, null, port = 8181) + origProps.put(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, "1") + + val config = KafkaConfig(origProps) + assertEquals(TimeUnit.MINUTES.toMillis(1), config.logRetentionTimeMillis) + val serverMock = Mockito.mock(classOf[BrokerServer]) + val logManagerMock = Mockito.mock(classOf[LogManager]) + + Mockito.when(serverMock.config).thenReturn(config) + Mockito.when(serverMock.logManager).thenReturn(logManagerMock) + Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty) + + val currentDefaultLogConfig = new AtomicReference(new LogConfig(new Properties)) + Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ => currentDefaultLogConfig.get()) + Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig]))) + .thenAnswer(invocation => currentDefaultLogConfig.set(invocation.getArgument(0))) + + config.dynamicConfig.initialize(None, None) + config.dynamicConfig.addBrokerReconfigurable(new DynamicLogConfig(logManagerMock, serverMock)) + + val props = new Properties() + props.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, "12345678") + config.dynamicConfig.updateDefaultConfig(props) + assertEquals(TimeUnit.MINUTES.toMillis(1), currentDefaultLogConfig.get().retentionMs) + } } class TestDynamicThreadPool() extends BrokerReconfigurable { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index f503caf43c4..ff0da0b4ed6 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1267,8 +1267,10 @@ object TestUtils extends Logging { def produceMessages[B <: KafkaBroker]( brokers: Seq[B], records: Seq[ProducerRecord[Array[Byte], Array[Byte]]], - acks: Int = -1): Unit = { - val producer = createProducer(plaintextBootstrapServers(brokers), acks = acks) + acks: Int = -1, + listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + ): Unit = { + val producer = createProducer(bootstrapServers(brokers, listenerName), acks = acks) try { val futures = records.map(producer.send) futures.foreach(_.get) @@ -1284,13 +1286,15 @@ object TestUtils extends Logging { brokers: Seq[B], topic: String, numMessages: Int, - acks: Int = -1): Seq[String] = { + acks: Int = -1, + listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + ): Seq[String] = { val values = (0 until numMessages).map(x => s"test-$x") val intSerializer = new IntegerSerializer() val records = values.zipWithIndex.map { case (v, i) => new ProducerRecord(topic, intSerializer.serialize(topic, i), v.getBytes) } - produceMessages(brokers, records, acks) + produceMessages(brokers, records, acks, listenerName) values }
