This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 25f2ed090a7 KAFKA-18405 Remove ZooKeeper logic from DynamicBrokerConfig (#18508) 25f2ed090a7 is described below commit 25f2ed090a7fc815225e2161bff964488f101238 Author: PoAn Yang <pay...@apache.org> AuthorDate: Thu Jan 16 19:23:59 2025 +0800 KAFKA-18405 Remove ZooKeeper logic from DynamicBrokerConfig (#18508) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../src/main/scala/kafka/server/BrokerServer.scala | 2 +- .../main/scala/kafka/server/ControllerServer.scala | 2 +- .../scala/kafka/server/DynamicBrokerConfig.scala | 78 +++------------------- .../src/main/scala/kafka/server/SharedServer.scala | 2 +- .../server/metadata/BrokerMetadataPublisher.scala | 6 +- .../kafka/server/DynamicBrokerConfigTest.scala | 64 +++++++++++------- .../unit/kafka/server/ReplicaManagerTest.scala | 4 +- 7 files changed, 55 insertions(+), 103 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 85a2219adce..22a135d1b45 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -193,7 +193,7 @@ class BrokerServer( info("Starting broker") val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin() - config.dynamicConfig.initialize(zkClientOpt = None, Some(clientMetricsReceiverPlugin)) + config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin)) /* start scheduler */ kafkaScheduler = new KafkaScheduler(config.backgroundThreads) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index c9947081b2d..76ffffe53b3 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -124,7 +124,7 @@ class ControllerServer( try { this.logIdent = logContext.logPrefix() info("Starting controller") - config.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None) + config.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None) maybeChangeStatus(STARTING, STARTED) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index ea99be3bcb1..32febffb546 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -26,7 +26,6 @@ import kafka.log.{LogCleaner, LogManager} import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.server.DynamicBrokerConfig._ import kafka.utils.{CoreUtils, Logging} -import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.common.Reconfigurable import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs} @@ -39,7 +38,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.security.PasswordEncoder import org.apache.kafka.server.ProcessRole -import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals} +import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs} import org.apache.kafka.server.telemetry.ClientTelemetry @@ -58,8 +57,6 @@ import scala.jdk.CollectionConverters._ * </ul> * The order of precedence for broker configs is: * <ol> - * <li>DYNAMIC_BROKER_CONFIG: stored in ZK at /configs/brokers/{brokerId}</li> - * <li>DYNAMIC_DEFAULT_BROKER_CONFIG: stored in ZK at /configs/brokers/<default></li> * <li>STATIC_BROKER_CONFIG: properties that broker is started up with, typically from server.properties file</li> * <li>DEFAULT_CONFIG: Default configs defined in KafkaConfig</li> * </ol> @@ -215,17 +212,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private var currentConfig: KafkaConfig = _ private val dynamicConfigPasswordEncoder = Some(PasswordEncoder.NOOP) - private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = { + private[server] def initialize(clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = { currentConfig = new KafkaConfig(kafkaConfig.props, false) metricsReceiverPluginOpt = clientMetricsReceiverPluginOpt - - zkClientOpt.foreach { zkClient => - val adminZkClient = new AdminZkClient(zkClient) - updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.BROKER, ZooKeeperInternals.DEFAULT_STRING), false) - val props = adminZkClient.fetchEntityConfig(ConfigType.BROKER, kafkaConfig.brokerId.toString) - val brokerConfig = maybeReEncodePasswords(props, adminZkClient) - updateBrokerConfig(kafkaConfig.brokerId, brokerConfig) - } } /** @@ -427,14 +416,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging props } - // If the secret has changed, password.encoder.old.secret contains the old secret that was used - // to encode the configs in ZK. Decode passwords using the old secret and update ZK with values - // encoded using the current secret. Ignore any errors during decoding since old secret may not - // have been removed during broker restart. - private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = { - persistentProps.clone().asInstanceOf[Properties] - } - /** * Validate the provided configs `propsOverride` and return the full Kafka configs with * the configured defaults and these overrides. @@ -900,7 +881,6 @@ object DynamicListenerConfig { */ val ReconfigurableConfigs = Set( // Listener configs - SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, SocketServerConfigs.LISTENERS_CONFIG, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, @@ -986,40 +966,16 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi DynamicListenerConfig.ReconfigurableConfigs } - private def listenerRegistrationsAltered( - oldAdvertisedListeners: Map[ListenerName, EndPoint], - newAdvertisedListeners: Map[ListenerName, EndPoint] - ): Boolean = { - if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true - oldAdvertisedListeners.foreachEntry { - case (oldListenerName, oldEndpoint) => - newAdvertisedListeners.get(oldListenerName) match { - case None => return true - case Some(newEndpoint) => if (!newEndpoint.equals(oldEndpoint)) { - return true - } - } - } - false - } - - private def verifyListenerRegistrationAlterationSupported(): Unit = { - if (!server.config.requiresZookeeper) { - throw new ConfigException("Advertised listeners cannot be altered when using a " + - "Raft-based metadata quorum.") - } - } - def validateReconfiguration(newConfig: KafkaConfig): Unit = { val oldConfig = server.config - val newListeners = listenersToMap(newConfig.listeners) - val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedBrokerListeners) - val oldListeners = listenersToMap(oldConfig.listeners) - if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet)) - throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'") - if (!newListeners.keySet.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet)) + val newListeners = newConfig.listeners.map(_.listenerName).toSet + val oldAdvertisedListeners = oldConfig.effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet + val oldListeners = oldConfig.listeners.map(_.listenerName).toSet + if (!oldAdvertisedListeners.subsetOf(newListeners)) + throw new ConfigException(s"Advertised listeners '$oldAdvertisedListeners' must be a subset of listeners '$newListeners'") + if (!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet)) throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.effectiveListenerSecurityProtocolMap}'") - newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName => + newListeners.intersect(oldListeners).foreach { listenerName => def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = { kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case (key, _) => // skip the reconfigurable configs @@ -1032,15 +988,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != newConfig.effectiveListenerSecurityProtocolMap(listenerName)) throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName") } - if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName)) - throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}") - - // Currently, we do not support adding or removing listeners when in KRaft mode. - // However, we support changing other listener configurations (max connections, etc.) - if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners), - listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) { - verifyListenerRegistrationAlterationSupported() - } } def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { @@ -1055,13 +1002,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved) if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded) } - if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners), - listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) { - verifyListenerRegistrationAlterationSupported() - server match { - case _ => throw new RuntimeException("Unable to handle reconfigure") - } - } } private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] = diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 45c9b10b026..66af33c1697 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -268,7 +268,7 @@ class SharedServer( // This is only done in tests. metrics = new Metrics() } - sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None) + sharedServerConfig.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None) if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) { brokerMetrics = new BrokerServerMetrics(metrics) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 957677c2b41..cc8d16b2e7c 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -17,7 +17,7 @@ package kafka.server.metadata -import java.util.{OptionalInt, Properties} +import java.util.OptionalInt import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.LogManager import kafka.server.{KafkaConfig, ReplicaManager} @@ -243,10 +243,6 @@ class BrokerMetadataPublisher( } } - def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = { - config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props) - } - /** * Update the coordinator of local replica changes: election and resignation. * diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 09555351c67..10b42f96b4e 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} import org.apache.kafka.common.config.{ConfigException, SslConfigs} import org.apache.kafka.common.metrics.{JmxReporter, Metrics} import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.server.authorizer._ @@ -57,7 +58,7 @@ class DynamicBrokerConfigTest { props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, oldKeystore) val config = KafkaConfig(props) val dynamicConfig = config.dynamicConfig - dynamicConfig.initialize(None, None) + dynamicConfig.initialize(None) assertEquals(config, dynamicConfig.currentKafkaConfig) assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) @@ -123,7 +124,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.logManager).thenReturn(logManagerMock) Mockito.when(serverMock.kafkaScheduler).thenReturn(schedulerMock) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new BrokerDynamicThreadPool(serverMock)) config.dynamicConfig.addReconfigurable(acceptorMock) @@ -179,7 +180,7 @@ class DynamicBrokerConfigTest { when(serverMock.config).thenReturn(config) when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) // Test dynamic update with valid values @@ -215,7 +216,7 @@ class DynamicBrokerConfigTest { when(serverMock.config).thenReturn(config) when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) // Test dynamic update with invalid values @@ -246,7 +247,7 @@ class DynamicBrokerConfigTest { val origProps = TestUtils.createBrokerConfig(0, port = 8181) origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS") val config = KafkaConfig(origProps) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> "ks.p12") @@ -265,7 +266,7 @@ class DynamicBrokerConfigTest { val origProps = TestUtils.createBrokerConfig(0, port = 8181) origProps.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "100000000") val config = KafkaConfig(origProps) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val validProps = Map.empty[String, String] val invalidProps = Map(CleanerConfig.LOG_CLEANER_THREADS_PROP -> "20") @@ -368,7 +369,7 @@ class DynamicBrokerConfigTest { private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = { val configProps = TestUtils.createBrokerConfig(0, port = 8181) val config = KafkaConfig(configProps) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val props = new Properties props.put(name, value) @@ -459,7 +460,7 @@ class DynamicBrokerConfigTest { def testAuthorizerConfig(): Unit = { val props = TestUtils.createBrokerConfig(0, port = 9092) val oldConfig = KafkaConfig.fromProps(props) - oldConfig.dynamicConfig.initialize(None, None) + oldConfig.dynamicConfig.initialize(None) val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker]) when(kafkaServer.config).thenReturn(oldConfig) @@ -505,7 +506,7 @@ class DynamicBrokerConfigTest { def testCombinedControllerAuthorizerConfig(): Unit = { val props = createCombinedControllerConfig(0, 9092) val oldConfig = KafkaConfig.fromProps(props) - oldConfig.dynamicConfig.initialize(None, None) + oldConfig.dynamicConfig.initialize(None) val controllerServer: ControllerServer = mock(classOf[kafka.server.ControllerServer]) when(controllerServer.config).thenReturn(oldConfig) @@ -550,7 +551,7 @@ class DynamicBrokerConfigTest { def testIsolatedControllerAuthorizerConfig(): Unit = { val props = createIsolatedControllerConfig(0, port = 9092) val oldConfig = KafkaConfig.fromProps(props) - oldConfig.dynamicConfig.initialize(None, None) + oldConfig.dynamicConfig.initialize(None) val controllerServer: ControllerServer = mock(classOf[kafka.server.ControllerServer]) when(controllerServer.config).thenReturn(oldConfig) @@ -589,7 +590,7 @@ class DynamicBrokerConfigTest { def testImproperConfigsAreRemoved(): Unit = { val props = TestUtils.createBrokerConfig(0) val config = KafkaConfig(props) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) assertEquals(SocketServerConfigs.MAX_CONNECTIONS_DEFAULT, config.maxConnections) assertEquals(LogConfig.DEFAULT_MAX_MESSAGE_BYTES, config.messageMaxBytes) @@ -624,7 +625,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId") config.dynamicConfig.addReconfigurable(m) assertEquals(1, m.currentReporters.size) @@ -649,7 +650,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId") config.dynamicConfig.addReconfigurable(m) assertTrue(m.currentReporters.isEmpty) @@ -681,7 +682,7 @@ class DynamicBrokerConfigTest { props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "2592000000") val config = KafkaConfig(props) val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker])) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) val newProps = new Properties() @@ -704,7 +705,7 @@ class DynamicBrokerConfigTest { props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "4294967296") val config = KafkaConfig(props) val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker])) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) val newProps = new Properties() @@ -727,7 +728,7 @@ class DynamicBrokerConfigTest { props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000") props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024") val config = KafkaConfig(props) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) // Check for invalid localRetentionMs < -2 verifyConfigUpdateWithInvalidConfig(config, props, Map.empty, Map(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP -> "-3")) @@ -757,7 +758,7 @@ class DynamicBrokerConfigTest { assertEquals(500, config.remoteLogManagerConfig.remoteFetchMaxWaitMs) val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig) val newProps = new Properties() @@ -792,7 +793,7 @@ class DynamicBrokerConfigTest { config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs) val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig) val newProps = new Properties() @@ -828,7 +829,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) val props = new Properties() @@ -851,7 +852,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, @@ -882,7 +883,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, @@ -917,7 +918,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) // Default values @@ -964,7 +965,7 @@ class DynamicBrokerConfigTest { props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, retentionBytes.toString) val config = KafkaConfig(props) val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker])) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) val newProps = new Properties() @@ -990,7 +991,7 @@ class DynamicBrokerConfigTest { Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig]))) .thenAnswer(invocation => currentDefaultLogConfig.set(invocation.getArgument(0))) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicLogConfig(logManagerMock, serverMock)) } @@ -1020,6 +1021,21 @@ class DynamicBrokerConfigTest { assertEquals(TimeUnit.HOURS.toMillis(1), ctx.config.logRetentionTimeMillis) assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG)) } + + @Test + def testAdvertisedListenersIsNotDynamicallyReconfigurable(): Unit = { + val origProps = TestUtils.createBrokerConfig(0, port = 8181) + val ctx = new DynamicLogConfigContext(origProps) + + // update advertised listeners should not work + val props = new Properties() + props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "SASL_PLAINTEXT://localhost:8181") + ctx.config.dynamicConfig.updateDefaultConfig(props) + ctx.config.effectiveAdvertisedBrokerListeners.foreach(e => + assertEquals(SecurityProtocol.PLAINTEXT.name, e.listenerName.value) + ) + assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)) + } } class TestDynamicThreadPool extends BrokerReconfigurable { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 0aa837f03e5..5365de394e5 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2539,7 +2539,7 @@ class ReplicaManagerTest { verify(addPartitionsToTxnManager, times(0)).addOrVerifyTransaction(any(), any(), any(), any(), any(), any()) // Dynamically enable verification. - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val props = new Properties() props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true") config.dynamicConfig.updateBrokerConfig(config.brokerId, props) @@ -2591,7 +2591,7 @@ class ReplicaManagerTest { assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) // Disable verification - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val props = new Properties() props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false") config.dynamicConfig.updateBrokerConfig(config.brokerId, props)