This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 6df29771ba7 KAFKA-18405 Remove ZooKeeper logic from 
DynamicBrokerConfig (#18508)
6df29771ba7 is described below

commit 6df29771ba7929d8c92e004c78fc310217a153de
Author: PoAn Yang <pay...@apache.org>
AuthorDate: Thu Jan 16 19:23:59 2025 +0800

    KAFKA-18405 Remove ZooKeeper logic from DynamicBrokerConfig (#18508)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |  2 +-
 .../main/scala/kafka/server/ControllerServer.scala |  2 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 78 +++-------------------
 .../src/main/scala/kafka/server/SharedServer.scala |  2 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |  6 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     | 64 +++++++++++-------
 .../unit/kafka/server/ReplicaManagerTest.scala     |  4 +-
 7 files changed, 55 insertions(+), 103 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 40de268a25b..8c19fcbf709 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -193,7 +193,7 @@ class BrokerServer(
       info("Starting broker")
 
       val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin()
-      config.dynamicConfig.initialize(zkClientOpt = None, 
Some(clientMetricsReceiverPlugin))
+      config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin))
 
       /* start scheduler */
       kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 16e1efd39d1..d7a39449068 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -123,7 +123,7 @@ class ControllerServer(
     try {
       this.logIdent = logContext.logPrefix()
       info("Starting controller")
-      config.dynamicConfig.initialize(zkClientOpt = None, 
clientMetricsReceiverPluginOpt = None)
+      config.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None)
 
       maybeChangeStatus(STARTING, STARTED)
 
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index ea99be3bcb1..32febffb546 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -26,7 +26,6 @@ import kafka.log.{LogCleaner, LogManager}
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.server.DynamicBrokerConfig._
 import kafka.utils.{CoreUtils, Logging}
-import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, SaslConfigs, SslConfigs}
@@ -39,7 +38,7 @@ import 
org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.security.PasswordEncoder
 import org.apache.kafka.server.ProcessRole
-import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
+import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, 
ServerLogConfigs, ServerTopicConfigSynonyms}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, 
MetricConfigs}
 import org.apache.kafka.server.telemetry.ClientTelemetry
@@ -58,8 +57,6 @@ import scala.jdk.CollectionConverters._
   * </ul>
   * The order of precedence for broker configs is:
   * <ol>
-  *   <li>DYNAMIC_BROKER_CONFIG: stored in ZK at 
/configs/brokers/{brokerId}</li>
-  *   <li>DYNAMIC_DEFAULT_BROKER_CONFIG: stored in ZK at 
/configs/brokers/&lt;default&gt;</li>
   *   <li>STATIC_BROKER_CONFIG: properties that broker is started up with, 
typically from server.properties file</li>
   *   <li>DEFAULT_CONFIG: Default configs defined in KafkaConfig</li>
   * </ol>
@@ -215,17 +212,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private var currentConfig: KafkaConfig = _
   private val dynamicConfigPasswordEncoder = Some(PasswordEncoder.NOOP)
 
-  private[server] def initialize(zkClientOpt: Option[KafkaZkClient], 
clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
+  private[server] def initialize(clientMetricsReceiverPluginOpt: 
Option[ClientMetricsReceiverPlugin]): Unit = {
     currentConfig = new KafkaConfig(kafkaConfig.props, false)
     metricsReceiverPluginOpt = clientMetricsReceiverPluginOpt
-
-    zkClientOpt.foreach { zkClient =>
-      val adminZkClient = new AdminZkClient(zkClient)
-      updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.BROKER, 
ZooKeeperInternals.DEFAULT_STRING), false)
-      val props = adminZkClient.fetchEntityConfig(ConfigType.BROKER, 
kafkaConfig.brokerId.toString)
-      val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
-      updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
-    }
   }
 
   /**
@@ -427,14 +416,6 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     props
   }
 
-  // If the secret has changed, password.encoder.old.secret contains the old 
secret that was used
-  // to encode the configs in ZK. Decode passwords using the old secret and 
update ZK with values
-  // encoded using the current secret. Ignore any errors during decoding since 
old secret may not
-  // have been removed during broker restart.
-  private def maybeReEncodePasswords(persistentProps: Properties, 
adminZkClient: AdminZkClient): Properties = {
-    persistentProps.clone().asInstanceOf[Properties]
-  }
-
   /**
    * Validate the provided configs `propsOverride` and return the full Kafka 
configs with
    * the configured defaults and these overrides.
@@ -900,7 +881,6 @@ object DynamicListenerConfig {
    */
   val ReconfigurableConfigs = Set(
     // Listener configs
-    SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
     SocketServerConfigs.LISTENERS_CONFIG,
     SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 
@@ -986,40 +966,16 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
     DynamicListenerConfig.ReconfigurableConfigs
   }
 
-  private def listenerRegistrationsAltered(
-    oldAdvertisedListeners: Map[ListenerName, EndPoint],
-    newAdvertisedListeners: Map[ListenerName, EndPoint]
-  ): Boolean = {
-    if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true
-    oldAdvertisedListeners.foreachEntry {
-      case (oldListenerName, oldEndpoint) =>
-        newAdvertisedListeners.get(oldListenerName) match {
-          case None => return true
-          case Some(newEndpoint) => if (!newEndpoint.equals(oldEndpoint)) {
-            return true
-          }
-        }
-    }
-    false
-  }
-
-  private def verifyListenerRegistrationAlterationSupported(): Unit = {
-    if (!server.config.requiresZookeeper) {
-      throw new ConfigException("Advertised listeners cannot be altered when 
using a " +
-        "Raft-based metadata quorum.")
-    }
-  }
-
   def validateReconfiguration(newConfig: KafkaConfig): Unit = {
     val oldConfig = server.config
-    val newListeners = listenersToMap(newConfig.listeners)
-    val newAdvertisedListeners = 
listenersToMap(newConfig.effectiveAdvertisedBrokerListeners)
-    val oldListeners = listenersToMap(oldConfig.listeners)
-    if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
-      throw new ConfigException(s"Advertised listeners 
'$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
-    if 
(!newListeners.keySet.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
+    val newListeners = newConfig.listeners.map(_.listenerName).toSet
+    val oldAdvertisedListeners = 
oldConfig.effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
+    val oldListeners = oldConfig.listeners.map(_.listenerName).toSet
+    if (!oldAdvertisedListeners.subsetOf(newListeners))
+      throw new ConfigException(s"Advertised listeners 
'$oldAdvertisedListeners' must be a subset of listeners '$newListeners'")
+    if 
(!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
       throw new ConfigException(s"Listeners '$newListeners' must be subset of 
listener map '${newConfig.effectiveListenerSecurityProtocolMap}'")
-    newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName 
=>
+    newListeners.intersect(oldListeners).foreach { listenerName =>
       def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): 
Map[String, AnyRef] = {
         kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case 
(key, _) =>
           // skip the reconfigurable configs
@@ -1032,15 +988,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
       if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != 
newConfig.effectiveListenerSecurityProtocolMap(listenerName))
         throw new ConfigException(s"Security protocol cannot be updated for 
existing listener $listenerName")
     }
-    if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName))
-      throw new ConfigException(s"Advertised listener must be specified for 
inter-broker listener ${newConfig.interBrokerListenerName}")
-
-    // Currently, we do not support adding or removing listeners when in KRaft 
mode.
-    // However, we support changing other listener configurations (max 
connections, etc.)
-    if 
(listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
-        listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
-      verifyListenerRegistrationAlterationSupported()
-    }
   }
 
   def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
@@ -1055,13 +1002,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
       if (listenersRemoved.nonEmpty) 
server.socketServer.removeListeners(listenersRemoved)
       if (listenersAdded.nonEmpty) 
server.socketServer.addListeners(listenersAdded)
     }
-    if 
(listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
-        listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
-      verifyListenerRegistrationAlterationSupported()
-      server match {
-        case _ => throw new RuntimeException("Unable to handle reconfigure")
-      }
-    }
   }
 
   private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, 
EndPoint] =
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala 
b/core/src/main/scala/kafka/server/SharedServer.scala
index 45c9b10b026..66af33c1697 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -268,7 +268,7 @@ class SharedServer(
           // This is only done in tests.
           metrics = new Metrics()
         }
-        sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None, 
clientMetricsReceiverPluginOpt = None)
+        
sharedServerConfig.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = 
None)
 
         if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
           brokerMetrics = new BrokerServerMetrics(metrics)
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 957677c2b41..cc8d16b2e7c 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -17,7 +17,7 @@
 
 package kafka.server.metadata
 
-import java.util.{OptionalInt, Properties}
+import java.util.OptionalInt
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.LogManager
 import kafka.server.{KafkaConfig, ReplicaManager}
@@ -243,10 +243,6 @@ class BrokerMetadataPublisher(
     }
   }
 
-  def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
-    config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
-  }
-
   /**
    * Update the coordinator of local replica changes: election and resignation.
    *
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 09555351c67..10b42f96b4e 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.acl.{AclBinding, 
AclBindingFilter}
 import org.apache.kafka.common.config.{ConfigException, SslConfigs}
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.server.authorizer._
@@ -57,7 +58,7 @@ class DynamicBrokerConfigTest {
     props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, oldKeystore)
     val config = KafkaConfig(props)
     val dynamicConfig = config.dynamicConfig
-    dynamicConfig.initialize(None, None)
+    dynamicConfig.initialize(None)
 
     assertEquals(config, dynamicConfig.currentKafkaConfig)
     assertEquals(oldKeystore, 
config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
@@ -123,7 +124,7 @@ class DynamicBrokerConfigTest {
     Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
     Mockito.when(serverMock.kafkaScheduler).thenReturn(schedulerMock)
 
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(new 
BrokerDynamicThreadPool(serverMock))
     config.dynamicConfig.addReconfigurable(acceptorMock)
 
@@ -179,7 +180,7 @@ class DynamicBrokerConfigTest {
     when(serverMock.config).thenReturn(config)
     when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
 
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
 
     // Test dynamic update with valid values
@@ -215,7 +216,7 @@ class DynamicBrokerConfigTest {
     when(serverMock.config).thenReturn(config)
     when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
 
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
 
     // Test dynamic update with invalid values
@@ -246,7 +247,7 @@ class DynamicBrokerConfigTest {
     val origProps = TestUtils.createBrokerConfig(0, port = 8181)
     origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
     val config = KafkaConfig(origProps)
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
 
     val validProps = 
Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> 
"ks.p12")
 
@@ -265,7 +266,7 @@ class DynamicBrokerConfigTest {
     val origProps = TestUtils.createBrokerConfig(0, port = 8181)
     origProps.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 
"100000000")
     val config = KafkaConfig(origProps)
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
 
     val validProps = Map.empty[String, String]
     val invalidProps = Map(CleanerConfig.LOG_CLEANER_THREADS_PROP -> "20")
@@ -368,7 +369,7 @@ class DynamicBrokerConfigTest {
   private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: 
Boolean, expectFailure: Boolean): Unit = {
     val configProps = TestUtils.createBrokerConfig(0, port = 8181)
     val config = KafkaConfig(configProps)
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
 
     val props = new Properties
     props.put(name, value)
@@ -459,7 +460,7 @@ class DynamicBrokerConfigTest {
   def testAuthorizerConfig(): Unit = {
     val props = TestUtils.createBrokerConfig(0, port = 9092)
     val oldConfig =  KafkaConfig.fromProps(props)
-    oldConfig.dynamicConfig.initialize(None, None)
+    oldConfig.dynamicConfig.initialize(None)
 
     val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker])
     when(kafkaServer.config).thenReturn(oldConfig)
@@ -505,7 +506,7 @@ class DynamicBrokerConfigTest {
   def testCombinedControllerAuthorizerConfig(): Unit = {
     val props = createCombinedControllerConfig(0, 9092)
     val oldConfig = KafkaConfig.fromProps(props)
-    oldConfig.dynamicConfig.initialize(None, None)
+    oldConfig.dynamicConfig.initialize(None)
 
     val controllerServer: ControllerServer = 
mock(classOf[kafka.server.ControllerServer])
     when(controllerServer.config).thenReturn(oldConfig)
@@ -550,7 +551,7 @@ class DynamicBrokerConfigTest {
   def testIsolatedControllerAuthorizerConfig(): Unit = {
     val props = createIsolatedControllerConfig(0, port = 9092)
     val oldConfig = KafkaConfig.fromProps(props)
-    oldConfig.dynamicConfig.initialize(None, None)
+    oldConfig.dynamicConfig.initialize(None)
 
     val controllerServer: ControllerServer = 
mock(classOf[kafka.server.ControllerServer])
     when(controllerServer.config).thenReturn(oldConfig)
@@ -589,7 +590,7 @@ class DynamicBrokerConfigTest {
   def testImproperConfigsAreRemoved(): Unit = {
     val props = TestUtils.createBrokerConfig(0)
     val config = KafkaConfig(props)
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
 
     assertEquals(SocketServerConfigs.MAX_CONNECTIONS_DEFAULT, 
config.maxConnections)
     assertEquals(LogConfig.DEFAULT_MAX_MESSAGE_BYTES, config.messageMaxBytes)
@@ -624,7 +625,7 @@ class DynamicBrokerConfigTest {
 
     Mockito.when(serverMock.config).thenReturn(config)
 
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId")
     config.dynamicConfig.addReconfigurable(m)
     assertEquals(1, m.currentReporters.size)
@@ -649,7 +650,7 @@ class DynamicBrokerConfigTest {
 
     Mockito.when(serverMock.config).thenReturn(config)
 
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId")
     config.dynamicConfig.addReconfigurable(m)
     assertTrue(m.currentReporters.isEmpty)
@@ -681,7 +682,7 @@ class DynamicBrokerConfigTest {
     props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "2592000000")
     val config = KafkaConfig(props)
     val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaBroker]))
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
 
     val newProps = new Properties()
@@ -704,7 +705,7 @@ class DynamicBrokerConfigTest {
     props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "4294967296")
     val config = KafkaConfig(props)
     val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaBroker]))
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
 
     val newProps = new Properties()
@@ -727,7 +728,7 @@ class DynamicBrokerConfigTest {
     props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000")
     props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024")
     val config = KafkaConfig(props)
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
 
     // Check for invalid localRetentionMs < -2
     verifyConfigUpdateWithInvalidConfig(config, props, Map.empty, 
Map(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP -> "-3"))
@@ -757,7 +758,7 @@ class DynamicBrokerConfigTest {
     assertEquals(500, config.remoteLogManagerConfig.remoteFetchMaxWaitMs)
 
     val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
 
     val newProps = new Properties()
@@ -792,7 +793,7 @@ class DynamicBrokerConfigTest {
       config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs)
 
     val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
 
     val newProps = new Properties()
@@ -828,7 +829,7 @@ class DynamicBrokerConfigTest {
     Mockito.when(serverMock.config).thenReturn(config)
     
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
 
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
 
     val props = new Properties()
@@ -851,7 +852,7 @@ class DynamicBrokerConfigTest {
     Mockito.when(serverMock.config).thenReturn(config)
     
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
 
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
 
     
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
@@ -882,7 +883,7 @@ class DynamicBrokerConfigTest {
     Mockito.when(serverMock.config).thenReturn(config)
     
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
 
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
 
     
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
@@ -917,7 +918,7 @@ class DynamicBrokerConfigTest {
     Mockito.when(serverMock.config).thenReturn(config)
     
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
 
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
 
     // Default values
@@ -964,7 +965,7 @@ class DynamicBrokerConfigTest {
     props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, 
retentionBytes.toString)
     val config = KafkaConfig(props)
     val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), 
mock(classOf[KafkaBroker]))
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
 
     val newProps = new Properties()
@@ -990,7 +991,7 @@ class DynamicBrokerConfigTest {
     
Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig])))
       .thenAnswer(invocation => 
currentDefaultLogConfig.set(invocation.getArgument(0)))
 
-    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.initialize(None)
     config.dynamicConfig.addBrokerReconfigurable(new 
DynamicLogConfig(logManagerMock, serverMock))
   }
 
@@ -1020,6 +1021,21 @@ class DynamicBrokerConfigTest {
     assertEquals(TimeUnit.HOURS.toMillis(1), ctx.config.logRetentionTimeMillis)
     
assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG))
   }
+
+  @Test
+  def testAdvertisedListenersIsNotDynamicallyReconfigurable(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, port = 8181)
+    val ctx = new DynamicLogConfigContext(origProps)
+
+    // update advertised listeners should not work
+    val props = new Properties()
+    props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"SASL_PLAINTEXT://localhost:8181")
+    ctx.config.dynamicConfig.updateDefaultConfig(props)
+    ctx.config.effectiveAdvertisedBrokerListeners.foreach(e =>
+      assertEquals(SecurityProtocol.PLAINTEXT.name, e.listenerName.value)
+    )
+    
assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG))
+  }
 }
 
 class TestDynamicThreadPool extends BrokerReconfigurable {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a3081f17ed3..6370aa546d6 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2539,7 +2539,7 @@ class ReplicaManagerTest {
       verify(addPartitionsToTxnManager, 
times(0)).addOrVerifyTransaction(any(), any(), any(), any(), any(), any())
 
       // Dynamically enable verification.
-      config.dynamicConfig.initialize(None, None)
+      config.dynamicConfig.initialize(None)
       val props = new Properties()
       
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "true")
       config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
@@ -2591,7 +2591,7 @@ class ReplicaManagerTest {
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
 
       // Disable verification
-      config.dynamicConfig.initialize(None, None)
+      config.dynamicConfig.initialize(None)
       val props = new Properties()
       
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "false")
       config.dynamicConfig.updateBrokerConfig(config.brokerId, props)

Reply via email to