This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 02d568c83c3 KAFKA-18854: Move part of 
DynamicConfig/DynamicBrokerConfig to server… (#21302)
02d568c83c3 is described below

commit 02d568c83c32b9af425bc23bd506efabb520005f
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Jan 28 18:09:15 2026 +0100

    KAFKA-18854: Move part of DynamicConfig/DynamicBrokerConfig to server… 
(#21302)
    
    … module
    
    This is moving just enough logic so ConfigCommand does not depend on any
    core classes anymore and can be moved to the tools module.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala |   3 +-
 .../main/scala/kafka/network/SocketServer.scala    |  48 ++--
 .../src/main/scala/kafka/server/ConfigHelper.scala |   6 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 272 +++-----------------
 .../main/scala/kafka/server/DynamicConfig.scala    |  69 -----
 core/src/main/scala/kafka/server/KafkaConfig.scala |   6 +-
 .../unit/kafka/network/ConnectionQuotasTest.scala  |  12 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     |  28 +-
 .../org/apache/kafka/network/SocketServer.java     |  42 +++
 .../kafka/server/config/DynamicBrokerConfig.java   | 281 +++++++++++++++++++++
 .../apache/kafka/server/config/DynamicConfig.java  |  70 +++++
 .../server/config/DynamicBrokerConfigTest.java     |  58 +++++
 .../kafka/tools/ConfigCommandIntegrationTest.java  |   2 +-
 13 files changed, 526 insertions(+), 371 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index e731b566f9e..cea7d774525 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -18,7 +18,6 @@
 package kafka.admin
 
 import joptsimple._
-import kafka.server.DynamicConfig
 import kafka.utils.Implicits._
 import kafka.utils.Logging
 import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, 
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, 
DescribeConfigsOptions, ListConfigResourcesOptions, ListTopicsOptions, 
ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, 
ScramMechanism => PublicScramMechanism}
@@ -30,7 +29,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity,
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.utils.{Exit, Utils}
 import org.apache.kafka.coordinator.group.GroupConfig
-import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
+import org.apache.kafka.server.config.{ConfigType, DynamicConfig, QuotaConfig}
 import org.apache.kafka.server.metrics.ClientMetricsConfigs
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.kafka.storage.internals.log.LogConfig
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index b9de13d637f..ebc0f990ce1 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -27,7 +27,6 @@ import java.util.concurrent._
 import java.util.concurrent.atomic._
 import kafka.network.Processor._
 import kafka.network.RequestChannel.{CloseConnectionResponse, 
EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
-import kafka.network.SocketServer._
 import kafka.server.{BrokerReconfigurable, KafkaConfig}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import kafka.utils._
@@ -43,7 +42,7 @@ import org.apache.kafka.common.requests.{ApiVersionsRequest, 
RequestContext, Req
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
 import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, 
Reconfigurable}
-import org.apache.kafka.network.{ConnectionQuotaEntity, 
ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
+import org.apache.kafka.network.{ConnectionQuotaEntity, 
ConnectionThrottledException, SocketServer => JSocketServer, 
SocketServerConfigs, TooManyConnectionsException}
 import org.apache.kafka.security.CredentialProvider
 import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory}
 import org.apache.kafka.server.config.QuotaConfig
@@ -92,8 +91,8 @@ class SocketServer(
   this.logIdent = logContext.logPrefix
 
   private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
-  private val memoryPoolDepletedPercentMetricName = 
metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup)
-  private val memoryPoolDepletedTimeMetricName = 
metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
+  private val memoryPoolDepletedPercentMetricName = 
metrics.metricName("MemoryPoolAvgDepletedPercent", JSocketServer.METRICS_GROUP)
+  private val memoryPoolDepletedTimeMetricName = 
metrics.metricName("MemoryPoolDepletedTimeTotal", JSocketServer.METRICS_GROUP)
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, 
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
   // data-plane
@@ -117,7 +116,7 @@ class SocketServer(
   metricsGroup.newGauge(s"NetworkProcessorAvgIdlePercent", () => 
SocketServer.this.synchronized {
     val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
     val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
-      metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
+      metrics.metricName("io-wait-ratio", JSocketServer.METRICS_GROUP, 
p.metricTags)
     }
     if (dataPlaneProcessors.isEmpty) {
       1.0
@@ -133,7 +132,7 @@ class SocketServer(
   metricsGroup.newGauge(s"ExpiredConnectionsKilledCount", () => 
SocketServer.this.synchronized {
     val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
     val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p 
=>
-      metrics.metricName("expired-connections-killed-count", MetricsGroup, 
p.metricTags)
+      metrics.metricName("expired-connections-killed-count", 
JSocketServer.METRICS_GROUP, p.metricTags)
     }
     expiredConnectionsKilledCountMetricNames.map { metricName =>
       Option(metrics.metric(metricName)).fold(0.0)(m => 
m.metricValue.asInstanceOf[Double])
@@ -311,7 +310,7 @@ class SocketServer(
     }
   }
 
-  override def reconfigurableConfigs: Set[String] = 
SocketServer.ReconfigurableConfigs
+  override def reconfigurableConfigs: util.Set[String] = 
JSocketServer.RECONFIGURABLE_CONFIGS
 
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
 
@@ -354,23 +353,6 @@ class SocketServer(
   }
 }
 
-object SocketServer {
-  val MetricsGroup = "socket-server-metrics"
-
-  val ReconfigurableConfigs: Set[String] = Set(
-    SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG,
-    SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
-    SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
-    SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)
-
-  val ListenerReconfigurableConfigs: Set[String] = 
Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, 
SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)
-
-  def closeSocket(channel: SocketChannel): Unit = {
-    Utils.closeQuietly(channel.socket, "channel socket")
-    Utils.closeQuietly(channel, "channel")
-  }
-}
-
 object DataPlaneAcceptor {
   val ListenerReconfigurableConfigs: Set[String] = 
Set(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
 }
@@ -613,7 +595,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
     // The serverChannel will be null if Acceptor's thread is not started
     Utils.closeQuietly(serverChannel, "Acceptor serverChannel")
     Utils.closeQuietly(nioSelector, "Acceptor nioSelector")
-    throttledSockets.foreach(throttledSocket => 
closeSocket(throttledSocket.socket))
+    throttledSockets.foreach(throttledSocket => 
JSocketServer.closeSocket(throttledSocket.socket))
     throttledSockets.clear()
   }
 
@@ -720,7 +702,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
     while (throttledSockets.headOption.exists(_.endThrottleTimeMs < timeMs)) {
       val closingSocket = throttledSockets.dequeue()
       debug(s"Closing socket from ip ${closingSocket.socket.getRemoteAddress}")
-      closeSocket(closingSocket.socket)
+      JSocketServer.closeSocket(closingSocket.socket)
     }
   }
 
@@ -852,7 +834,7 @@ private[kafka] class Processor(
   ).asJava
 
   metricsGroup.newGauge(IdlePercentMetricName, () => {
-    Option(metrics.metric(metrics.metricName("io-wait-ratio", MetricsGroup, 
metricTags))).fold(0.0)(m =>
+    Option(metrics.metric(metrics.metricName("io-wait-ratio", 
JSocketServer.METRICS_GROUP, metricTags))).fold(0.0)(m =>
       Math.min(m.metricValue.asInstanceOf[Double], 1.0))
   },
     // for compatibility, only add a networkProcessor tag to the Yammer 
Metrics alias (the equivalent Selector metric
@@ -861,7 +843,7 @@ private[kafka] class Processor(
   )
 
   private val expiredConnectionsKilledCount = new CumulativeSum()
-  private val expiredConnectionsKilledCountMetricName = 
metrics.metricName("expired-connections-killed-count", MetricsGroup, metricTags)
+  private val expiredConnectionsKilledCountMetricName = 
metrics.metricName("expired-connections-killed-count", 
JSocketServer.METRICS_GROUP, metricTags)
   metrics.addMetric(expiredConnectionsKilledCountMetricName, 
expiredConnectionsKilledCount)
 
   private[network] val selector = createSelector(
@@ -1352,7 +1334,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate: 
Option[Int]): Unit = synchronized {
     def isIpConnectionRateMetric(metricName: MetricName) = {
       metricName.name == ConnectionQuotaEntity.CONNECTION_RATE_METRIC_NAME &&
-      metricName.group == MetricsGroup &&
+      metricName.group == JSocketServer.METRICS_GROUP &&
       metricName.tags.containsKey(ConnectionQuotaEntity.IP_METRIC_TAG)
     }
 
@@ -1620,7 +1602,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private def connectionRateMetricName(connectionQuotaEntity: 
ConnectionQuotaEntity): MetricName = {
     metrics.metricName(
       connectionQuotaEntity.metricName,
-      MetricsGroup,
+      JSocketServer.METRICS_GROUP,
       s"Tracking rate of accepting new connections (per second)",
       connectionQuotaEntity.metricTags)
   }
@@ -1653,7 +1635,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
     }
 
     override def reconfigurableConfigs(): util.Set[String] = {
-      SocketServer.ListenerReconfigurableConfigs.asJava
+      JSocketServer.LISTENER_RECONFIGURABLE_CONFIGS
     }
 
     override def validateReconfiguration(configs: util.Map[String, _]): Unit = 
{
@@ -1696,7 +1678,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
     private def createConnectionRateThrottleSensor(throttlePrefix: String): 
Sensor = {
       val sensor = 
metrics.sensor(s"${throttlePrefix}ConnectionRateThrottleTime-${listener.value}")
       val metricName = 
metrics.metricName(s"${throttlePrefix}connection-accept-throttle-time",
-        MetricsGroup,
+        JSocketServer.METRICS_GROUP,
         "Tracking average throttle-time, out of non-zero throttle times, per 
listener",
         Map(ListenerMetricTag -> listener.value).asJava)
       sensor.add(metricName, new Avg)
@@ -1711,7 +1693,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
     if (channel != null) {
       log.debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress}")
       dec(listenerName, channel.socket.getInetAddress)
-      closeSocket(channel)
+      JSocketServer.closeSocket(channel)
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala 
b/core/src/main/scala/kafka/server/ConfigHelper.scala
index 743937b54fc..5c32c14eb2d 100644
--- a/core/src/main/scala/kafka/server/ConfigHelper.scala
+++ b/core/src/main/scala/kafka/server/ConfigHelper.scala
@@ -35,7 +35,7 @@ import 
org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
 import org.apache.kafka.coordinator.group.GroupConfig
 import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
 import org.apache.kafka.server.ConfigHelperUtils.createResponseConfig
-import org.apache.kafka.server.config.ServerTopicConfigSynonyms
+import org.apache.kafka.server.config.{DynamicBrokerConfig, 
ServerTopicConfigSynonyms}
 import org.apache.kafka.server.logger.LoggingController
 import org.apache.kafka.server.metrics.ClientMetricsConfigs
 import org.apache.kafka.storage.internals.log.LogConfig
@@ -246,7 +246,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
       .filter(perBrokerConfig || _.source == 
ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG.id)
     val synonyms = if (!includeSynonyms) List.empty else allSynonyms
     val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else 
allSynonyms.head.source
-    val readOnly = !DynamicBrokerConfig.AllDynamicConfigs.contains(name)
+    val readOnly = !DynamicBrokerConfig.ALL_DYNAMIC_CONFIGS.contains(name)
 
     val dataType = configResponseType(configEntryType)
     val configDocumentation = if (includeDocumentation) 
brokerDocumentation(name) else null
@@ -274,7 +274,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
   }
 
   private def brokerSynonyms(name: String): List[String] = {
-    DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = 
true)
+    DynamicBrokerConfig.brokerConfigSynonyms(name, true).asScala.toList
   }
 
   private def brokerDocumentation(name: String): String = {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 27156831b25..a467f18ace7 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -22,14 +22,13 @@ import java.util.{Collections, Properties}
 import java.util.concurrent.CopyOnWriteArrayList
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.log.LogManager
-import kafka.network.{DataPlaneAcceptor, SocketServer}
+import kafka.network.DataPlaneAcceptor
 import kafka.raft.KafkaRaftManager
 import kafka.server.DynamicBrokerConfig._
 import kafka.utils.Logging
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.Endpoint
-import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource, SaslConfigs, SslConfigs}
+import org.apache.kafka.common.config.{ConfigDef, ConfigException, 
ConfigResource, SslConfigs}
 import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType}
 import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
@@ -37,20 +36,17 @@ import 
org.apache.kafka.common.security.authenticator.LoginManager
 import org.apache.kafka.common.utils.LogContext
 import org.apache.kafka.common.utils.{BufferSupplier, ConfigUtils, Utils}
 import org.apache.kafka.config
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.network.SocketServerConfigs
+import org.apache.kafka.network.SocketServer
 import org.apache.kafka.raft.KafkaRaftClient
 import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
 import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, 
ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
+import org.apache.kafka.server.config.{DynamicConfig, 
DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, 
DynamicBrokerConfig => JDynamicBrokerConfig}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, 
MetricConfigs}
 import org.apache.kafka.server.telemetry.{ClientTelemetry, 
ClientTelemetryExporterProvider}
 import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
 import org.apache.kafka.snapshot.RecordsSnapshotReader
-import org.apache.kafka.storage.internals.log.{LogCleaner, LogConfig}
+import org.apache.kafka.storage.internals.log.LogConfig
 
 import scala.util.Using
 import scala.collection._
@@ -90,117 +86,8 @@ import scala.jdk.CollectionConverters._
   */
 object DynamicBrokerConfig {
 
-  private[server] val DynamicSecurityConfigs = 
SslConfigs.RECONFIGURABLE_CONFIGS.asScala
-  private[server] val DynamicProducerStateManagerConfig = 
Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, 
TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
-
-  val AllDynamicConfigs = DynamicSecurityConfigs ++
-    LogCleaner.RECONFIGURABLE_CONFIGS.asScala ++
-    DynamicLogConfig.ReconfigurableConfigs ++
-    DynamicThreadPool.RECONFIGURABLE_CONFIGS.asScala ++
-    Set(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG) ++
-    DynamicListenerConfig.ReconfigurableConfigs ++
-    SocketServer.ReconfigurableConfigs ++
-    DynamicProducerStateManagerConfig ++
-    DynamicRemoteLogConfig.ReconfigurableConfigs ++
-    DynamicReplicationConfig.ReconfigurableConfigs ++
-    Set(AbstractConfig.CONFIG_PROVIDERS_CONFIG) ++
-    GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala ++
-    ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala
-
-  private val ClusterLevelListenerConfigs = 
Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, 
SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, 
SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
-  private val PerBrokerConfigs = (DynamicSecurityConfigs ++ 
DynamicListenerConfig.ReconfigurableConfigs).diff(
-    ClusterLevelListenerConfigs)
-  private val ListenerMechanismConfigs = Set(SaslConfigs.SASL_JAAS_CONFIG,
-    SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
-    SaslConfigs.SASL_LOGIN_CLASS,
-    BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG,
-    BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG)
-
   private val ReloadableFileConfigs = 
Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
 
-  private val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
-
-  def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): 
List[String] = {
-    name match {
-      case ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG | 
ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG =>
-        List(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, 
ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG)
-      case ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG | 
ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG =>
-        List(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, 
ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG)
-      case ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG => // 
KafkaLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG is used as default
-        List(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, 
ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG)
-      case ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG | 
ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG | 
ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG =>
-        List(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 
ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, 
ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG)
-      case ListenerConfigRegex(baseName) if matchListenerOverride =>
-        // `ListenerMechanismConfigs` are specified as 
listenerPrefix.mechanism.<configName>
-        // and other listener configs are specified as 
listenerPrefix.<configName>
-        // Add <configName> as a synonym in both cases.
-        val mechanismConfig = ListenerMechanismConfigs.find(baseName.endsWith)
-        List(name, mechanismConfig.getOrElse(baseName))
-      case _ => List(name)
-    }
-  }
-
-  def validateConfigs(props: Properties, perBrokerConfig: Boolean): Unit = {
-    def checkInvalidProps(invalidPropNames: Set[String], errorMessage: 
String): Unit = {
-      if (invalidPropNames.nonEmpty)
-        throw new ConfigException(s"$errorMessage: $invalidPropNames")
-    }
-    checkInvalidProps(nonDynamicConfigs(props), "Cannot update these configs 
dynamically")
-    checkInvalidProps(securityConfigsWithoutListenerPrefix(props),
-      "These security configs can be dynamically updated only per-listener 
using the listener prefix")
-    validateConfigTypes(props)
-    if (!perBrokerConfig) {
-      checkInvalidProps(perBrokerConfigs(props),
-        "Cannot update these configs at default cluster level, broker id must 
be specified")
-    }
-  }
-
-  private def perBrokerConfigs(props: Properties): Set[String] = {
-    val configNames = props.asScala.keySet
-    def perBrokerListenerConfig(name: String): Boolean = {
-      name match {
-        case ListenerConfigRegex(baseName) => 
!ClusterLevelListenerConfigs.contains(baseName)
-        case _ => false
-      }
-    }
-    configNames.intersect(PerBrokerConfigs) ++ 
configNames.filter(perBrokerListenerConfig)
-  }
-
-  private def nonDynamicConfigs(props: Properties): Set[String] = {
-    props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
-  }
-
-  private def securityConfigsWithoutListenerPrefix(props: Properties): 
Set[String] = {
-    DynamicSecurityConfigs.filter(props.containsKey)
-  }
-
-  private def validateConfigTypes(props: Properties): Unit = {
-    val baseProps = new Properties
-    props.asScala.foreach {
-      case (ListenerConfigRegex(baseName), v) => baseProps.put(baseName, v)
-      case (k, v) => baseProps.put(k, v)
-    }
-    DynamicConfig.Broker.validate(baseProps)
-  }
-
-  private[server] def dynamicConfigUpdateModes: util.Map[String, String] = {
-    AllDynamicConfigs.map { name =>
-      val mode = if (PerBrokerConfigs.contains(name)) "per-broker" else 
"cluster-wide"
-      name -> mode
-    }.toMap.asJava
-  }
-
-  private[server] def resolveVariableConfigs(propsOriginal: Properties): 
Properties = {
-    val props = new Properties
-    val config = new AbstractConfig(new ConfigDef(), propsOriginal, 
Utils.castToStringObjectMap(propsOriginal), false)
-    config.originals.forEach { (key, value) =>
-      if (!key.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) {
-        props.put(key, value)
-      }
-    }
-    props
-  }
-
   private[server] def readDynamicBrokerConfigsFromSnapshot(
     raftManager: KafkaRaftManager[ApiMessageAndVersion],
     config: KafkaConfig,
@@ -233,7 +120,7 @@ object DynamicBrokerConfig {
             batch.forEach { record =>
               if (record.message().apiKey() == 
MetadataRecordType.CONFIG_RECORD.id) {
                 val configRecord = record.message().asInstanceOf[ConfigRecord]
-                if 
(DynamicBrokerConfig.AllDynamicConfigs.contains(configRecord.name()) &&
+                if 
(JDynamicBrokerConfig.ALL_DYNAMIC_CONFIGS.contains(configRecord.name()) &&
                   configRecord.resourceType() == 
ConfigResource.Type.BROKER.id()) {
                     if (configRecord.resourceName().isEmpty) {
                       putOrRemoveIfNull(dynamicDefaultConfigs, 
configRecord.name(), configRecord.value())
@@ -339,14 +226,14 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   }
 
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
-    verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
+    verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
     reconfigurables.add(reconfigurable)
   }
 
   def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable): 
Unit = {
-    verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
+    verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
     brokerReconfigurables.add(new BrokerReconfigurable {
-      override def reconfigurableConfigs: Set[String] = 
reconfigurable.reconfigurableConfigs().asScala
+      override def reconfigurableConfigs: util.Set[String] = 
reconfigurable.reconfigurableConfigs
 
       override def validateReconfiguration(newConfig: KafkaConfig): Unit = 
reconfigurable.validateReconfiguration(newConfig)
 
@@ -363,8 +250,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     reconfigurables.remove(reconfigurable)
   }
 
-  private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = {
-    val nonDynamic = 
configNames.intersect(DynamicConfig.Broker.nonDynamicProps)
+  private def verifyReconfigurableConfigs(configNames: util.Set[String]): Unit 
= {
+    val nonDynamic = new util.HashSet(configNames)
+    nonDynamic.retainAll(DynamicConfig.Broker.nonDynamicProps)
     require(nonDynamic.isEmpty, s"Reconfigurable contains non-dynamic configs 
$nonDynamic")
   }
 
@@ -436,17 +324,17 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 
     // Remove all invalid configs from `props`
     removeInvalidConfigs(props, perBrokerConfig)
-    def removeInvalidProps(invalidPropNames: Set[String], errorMessage: 
String): Unit = {
-      if (invalidPropNames.nonEmpty) {
-        invalidPropNames.foreach(props.remove)
+    def removeInvalidProps(invalidPropNames: util.Set[String], errorMessage: 
String): Unit = {
+      if (!invalidPropNames.isEmpty) {
+        invalidPropNames.forEach(name => props.remove(name))
         error(s"$errorMessage: $invalidPropNames")
       }
     }
-    removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs will be 
ignored")
-    removeInvalidProps(securityConfigsWithoutListenerPrefix(props),
+    removeInvalidProps(JDynamicBrokerConfig.nonDynamicConfigs(props), 
"Non-dynamic configs will be ignored")
+    
removeInvalidProps(JDynamicBrokerConfig.securityConfigsWithoutListenerPrefix(props),
       "Security configs can be dynamically updated only using listener prefix, 
base configs will be ignored")
     if (!perBrokerConfig)
-      removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined 
at default cluster level will be ignored")
+      removeInvalidProps(JDynamicBrokerConfig.perBrokerConfigs(props), 
"Per-broker configs defined at default cluster level will be ignored")
 
     props
   }
@@ -458,8 +346,8 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
    * Note: The caller must acquire the read or write lock before invoking this 
method.
    */
   private def validatedKafkaProps(propsOverride: Properties, perBrokerConfig: 
Boolean): Map[String, String] = {
-    val propsResolved = 
DynamicBrokerConfig.resolveVariableConfigs(propsOverride)
-    validateConfigs(propsResolved, perBrokerConfig)
+    val propsResolved = 
JDynamicBrokerConfig.resolveVariableConfigs(propsOverride)
+    JDynamicBrokerConfig.validateConfigs(propsResolved, perBrokerConfig)
     val newProps = mutable.Map[String, String]()
     newProps ++= staticBrokerConfigs
     if (perBrokerConfig) {
@@ -479,7 +367,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 
   private def removeInvalidConfigs(props: Properties, perBrokerConfig: 
Boolean): Unit = {
     try {
-      validateConfigTypes(props)
+      JDynamicBrokerConfig.validateConfigTypes(props)
       props.asScala
     } catch {
       case e: Exception =>
@@ -487,7 +375,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
           val props1 = new Properties
           props1.put(k, v)
           try {
-            validateConfigTypes(props1)
+            JDynamicBrokerConfig.validateConfigTypes(props1)
             false
           } catch {
             case _: Exception => true
@@ -532,7 +420,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
       // so that base configs corresponding to listener configs are not 
removed. Base configs should not be removed
       // since they may be used by other listeners. It is ok to retain them in 
`props` since base configs cannot be
       // dynamically updated and listener-specific configs have the higher 
precedence.
-      brokerConfigSynonyms(k, matchListenerOverride = 
false).foreach(props.remove)
+      JDynamicBrokerConfig.brokerConfigSynonyms(k, false).forEach(props.remove)
       props.put(k, v)
     }
   }
@@ -572,7 +460,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
         // BrokerReconfigurable updates are processed after config is updated. 
Only do the validation here.
         val brokerReconfigurablesToUpdate = 
mutable.Buffer[BrokerReconfigurable]()
         brokerReconfigurables.forEach { reconfigurable =>
-          if 
(needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, 
changeMap.keySet, deletedKeySet)) {
+          if (needsReconfiguration(reconfigurable.reconfigurableConfigs, 
changeMap.keySet, deletedKeySet)) {
             reconfigurable.validateReconfiguration(newConfig)
             if (!validateOnly)
               brokerReconfigurablesToUpdate += reconfigurable
@@ -641,27 +529,17 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
  */
 trait BrokerReconfigurable {
 
-  def reconfigurableConfigs: Set[String]
+  def reconfigurableConfigs: util.Set[String]
 
   def validateReconfiguration(newConfig: KafkaConfig): Unit
 
   def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
 }
 
-object DynamicLogConfig {
-  /**
-   * The broker configurations pertaining to logs that are reconfigurable. 
This set contains
-   * the names you would use when setting a static or dynamic broker 
configuration (not topic
-   * configuration).
-   */
-  val ReconfigurableConfigs: Set[String] =
-    ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.values.toSet
-}
-
 class DynamicLogConfig(logManager: LogManager) extends BrokerReconfigurable 
with Logging {
 
-  override def reconfigurableConfigs: Set[String] = {
-    DynamicLogConfig.ReconfigurableConfigs
+  override def reconfigurableConfigs: util.Set[String] = {
+    JDynamicBrokerConfig.DynamicLogConfig.RECONFIGURABLE_CONFIGS
   }
 
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -728,8 +606,8 @@ class DynamicLogConfig(logManager: LogManager) extends 
BrokerReconfigurable with
 
 class ControllerDynamicThreadPool(controller: ControllerServer) extends 
BrokerReconfigurable {
 
-  override def reconfigurableConfigs: Set[String] = {
-    Set(ServerConfigs.NUM_IO_THREADS_CONFIG)
+  override def reconfigurableConfigs: util.Set[String] = {
+    util.Set.of(ServerConfigs.NUM_IO_THREADS_CONFIG)
   }
 
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -744,8 +622,8 @@ class ControllerDynamicThreadPool(controller: 
ControllerServer) extends BrokerRe
 
 class BrokerDynamicThreadPool(server: KafkaBroker) extends 
BrokerReconfigurable {
 
-  override def reconfigurableConfigs: Set[String] = {
-    DynamicThreadPool.RECONFIGURABLE_CONFIGS.asScala
+  override def reconfigurableConfigs: util.Set[String] = {
+    DynamicThreadPool.RECONFIGURABLE_CONFIGS
   }
 
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -880,60 +758,6 @@ class DynamicMetricReporterState(brokerId: Int, config: 
KafkaConfig, metrics: Me
   }
 }
 
-object DynamicListenerConfig {
-  /**
-   * The set of configurations which the DynamicListenerConfig object listens 
for. Many of
-   * these are also monitored by other objects such as ChannelBuilders and 
SocketServers.
-   */
-  val ReconfigurableConfigs = Set(
-    // Listener configs
-    SocketServerConfigs.LISTENERS_CONFIG,
-    SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
-
-    // SSL configs
-    BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
-    SslConfigs.SSL_PROTOCOL_CONFIG,
-    SslConfigs.SSL_PROVIDER_CONFIG,
-    SslConfigs.SSL_CIPHER_SUITES_CONFIG,
-    SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
-    SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
-    SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
-    SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
-    SslConfigs.SSL_KEY_PASSWORD_CONFIG,
-    SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
-    SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
-    SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
-    SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG,
-    SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
-    SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
-    SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG,
-    BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
-    SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG,
-
-    // SASL configs
-    BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
-    SaslConfigs.SASL_JAAS_CONFIG,
-    BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
-    SaslConfigs.SASL_KERBEROS_SERVICE_NAME,
-    SaslConfigs.SASL_KERBEROS_KINIT_CMD,
-    SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR,
-    SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER,
-    SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN,
-    BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG,
-    SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR,
-    SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER,
-    SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS,
-    SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS,
-
-    // Connection limit configs
-    SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
-    SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
-
-    // Network threads
-    SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG
-  )
-}
-
 class DynamicClientQuotaCallback(
   quotaManagers: QuotaFactory.QuotaManagers,
   serverConfig: KafkaConfig
@@ -974,8 +798,8 @@ class DynamicClientQuotaCallback(
 
 class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable 
with Logging {
 
-  override def reconfigurableConfigs: Set[String] = {
-    DynamicListenerConfig.ReconfigurableConfigs
+  override def reconfigurableConfigs: util.Set[String] = {
+    JDynamicBrokerConfig.DynamicListenerConfig.RECONFIGURABLE_CONFIGS
   }
 
   def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -991,7 +815,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
       def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): 
Map[String, AnyRef] = {
         kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case 
(key, _) =>
           // skip the reconfigurable configs
-          !DynamicSecurityConfigs.contains(key) && 
!SocketServer.ListenerReconfigurableConfigs.contains(key) && 
!DataPlaneAcceptor.ListenerReconfigurableConfigs.contains(key)
+          !JDynamicBrokerConfig.DYNAMIC_SECURITY_CONFIGS.contains(key) && 
!SocketServer.LISTENER_RECONFIGURABLE_CONFIGS.contains(key) && 
!DataPlaneAcceptor.ListenerReconfigurableConfigs.contains(key)
         }
       }
       if (immutableListenerConfigs(newConfig, listenerName.configPrefix) != 
immutableListenerConfigs(oldConfig, listenerName.configPrefix))
@@ -1022,8 +846,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
 }
 
 class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable 
with Logging {
-  override def reconfigurableConfigs: Set[String] = {
-    DynamicRemoteLogConfig.ReconfigurableConfigs
+  override def reconfigurableConfigs: util.Set[String] = {
+    JDynamicBrokerConfig.DynamicRemoteLogConfig.RECONFIGURABLE_CONFIGS
   }
 
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -1124,23 +948,9 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends 
BrokerReconfigurable w
   }
 }
 
-object DynamicRemoteLogConfig {
-  val ReconfigurableConfigs = Set(
-    RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
-    RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
-    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
-    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
-    RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
-    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
-    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
-    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
-    RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP
-  )
-}
-
 class DynamicReplicationConfig(server: KafkaBroker) extends 
BrokerReconfigurable with Logging {
-  override def reconfigurableConfigs: Set[String] = {
-    DynamicReplicationConfig.ReconfigurableConfigs
+  override def reconfigurableConfigs: util.Set[String] = {
+    JDynamicBrokerConfig.DynamicReplicationConfig.RECONFIGURABLE_CONFIGS
   }
 
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -1151,9 +961,3 @@ class DynamicReplicationConfig(server: KafkaBroker) 
extends BrokerReconfigurable
     // Currently it is a noop for reconfiguring the dynamic config 
follower.fetch.last.tiered.offset.enable
   }
 }
-
-object DynamicReplicationConfig {
-  val ReconfigurableConfigs = Set(
-    ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG
-  )
-}
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala 
b/core/src/main/scala/kafka/server/DynamicConfig.scala
deleted file mode 100644
index ad48b904c13..00000000000
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import kafka.server.DynamicBrokerConfig.AllDynamicConfigs
-
-import java.util.Properties
-import org.apache.kafka.common.config.ConfigDef
-import org.apache.kafka.server.config.QuotaConfig 
-
-import java.util
-import scala.jdk.CollectionConverters._
-
-/**
-  * Class used to hold dynamic configs. These are configs which have no 
physical manifestation in the server.properties
-  * and can only be set dynamically.
-  */
-object DynamicConfig {
-    object Broker {
-      private val brokerConfigs = {
-        val configs = QuotaConfig.brokerQuotaConfigs()
-
-        // Filter and define all dynamic configurations
-        KafkaConfig.configKeys
-          .filter { case (configName, _) => 
AllDynamicConfigs.contains(configName) }
-          .foreach { case (_, config) => configs.define(config) }
-        configs
-      }
-
-    // In order to avoid circular reference, all DynamicBrokerConfig's 
variables which are initialized by `DynamicConfig.Broker` should be moved to 
`DynamicConfig.Broker`.
-    // Otherwise, those variables of DynamicBrokerConfig will see intermediate 
state of `DynamicConfig.Broker`, because `brokerConfigs` is created by 
`DynamicBrokerConfig.AllDynamicConfigs`
-    val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- 
brokerConfigs.names.asScala
-
-    def configKeys: util.Map[String, ConfigDef.ConfigKey] = 
brokerConfigs.configKeys
-
-    def names: util.Set[String] = brokerConfigs.names
-
-    def validate(props: Properties): util.Map[String, AnyRef] = 
DynamicConfig.validate(brokerConfigs, props, customPropsAllowed = true)
-  }
-
-
-  private def validate(configDef: ConfigDef, props: Properties, 
customPropsAllowed: Boolean) = {
-    // Validate Names
-    val names = configDef.names
-    val propKeys = props.keySet.asScala.map(_.asInstanceOf[String])
-    if (!customPropsAllowed) {
-      val unknownKeys = propKeys.filterNot(names.contains(_))
-      require(unknownKeys.isEmpty, s"Unknown Dynamic Configuration: 
$unknownKeys.")
-    }
-    val propResolved = DynamicBrokerConfig.resolveVariableConfigs(props)
-    // ValidateValues
-    configDef.parse(propResolved)
-  }
-}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8c3c6094e68..740130cfac2 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
 import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.config.AbstractKafkaConfig.getMap
-import org.apache.kafka.server.config.{AbstractKafkaConfig, QuotaConfig, 
ReplicationConfigs, ServerConfigs, ServerLogConfigs}
+import org.apache.kafka.server.config.{AbstractKafkaConfig, DynamicConfig, 
QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, 
DynamicBrokerConfig => JDynamicBrokerConfig}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.MetricConfigs
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
@@ -58,7 +58,7 @@ object KafkaConfig {
 
   def main(args: Array[String]): Unit = {
     System.out.println(configDef.toHtml(4, (config: String) => 
"brokerconfigs_" + config,
-      DynamicBrokerConfig.dynamicConfigUpdateModes))
+      JDynamicBrokerConfig.dynamicConfigUpdateModes))
   }
 
   val configDef = AbstractKafkaConfig.CONFIG_DEF
@@ -95,7 +95,7 @@ object KafkaConfig {
     typeOf(configName) match {
       case Some(t) => Some(t)
       case None =>
-        DynamicBrokerConfig.brokerConfigSynonyms(configName, 
matchListenerOverride = true).flatMap(typeOf).headOption
+        JDynamicBrokerConfig.brokerConfigSynonyms(configName, 
true).asScala.flatMap(typeOf).headOption
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala 
b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
index 2d81c2a773b..0c0d7581e81 100644
--- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.metrics.internals.MetricsUtils
 import org.apache.kafka.common.metrics.{KafkaMetric, MetricConfig, Metrics}
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.network.{ConnectionThrottledException, 
SocketServerConfigs, TooManyConnectionsException}
+import org.apache.kafka.network.{ConnectionThrottledException, SocketServer, 
SocketServerConfigs, TooManyConnectionsException}
 import org.apache.kafka.server.config.{QuotaConfig, ReplicationConfigs}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.MockTime
@@ -830,7 +830,7 @@ class ConnectionQuotasTest {
   private def listenerConnThrottleMetric(listener: String) : KafkaMetric = {
     val metricName = metrics.metricName(
       "connection-accept-throttle-time",
-      SocketServer.MetricsGroup,
+      SocketServer.METRICS_GROUP,
       util.Map.of(Processor.ListenerMetricTag, listener))
     metrics.metric(metricName)
   }
@@ -838,7 +838,7 @@ class ConnectionQuotasTest {
   private def ipConnThrottleMetric(listener: String): KafkaMetric = {
     val metricName = metrics.metricName(
       "ip-connection-accept-throttle-time",
-      SocketServer.MetricsGroup,
+      SocketServer.METRICS_GROUP,
       util.Map.of(Processor.ListenerMetricTag, listener))
     metrics.metric(metricName)
   }
@@ -846,7 +846,7 @@ class ConnectionQuotasTest {
   private def listenerConnRateMetric(listener: String) : KafkaMetric = {
     val metricName = metrics.metricName(
       "connection-accept-rate",
-      SocketServer.MetricsGroup,
+      SocketServer.METRICS_GROUP,
       util.Map.of(Processor.ListenerMetricTag, listener))
     metrics.metric(metricName)
   }
@@ -854,14 +854,14 @@ class ConnectionQuotasTest {
   private def brokerConnRateMetric() : KafkaMetric = {
     val metricName = metrics.metricName(
       s"broker-connection-accept-rate",
-      SocketServer.MetricsGroup)
+      SocketServer.METRICS_GROUP)
     metrics.metric(metricName)
   }
 
   private def ipConnRateMetric(ip: String): KafkaMetric = {
     val metricName = metrics.metricName(
       s"connection-accept-rate",
-      SocketServer.MetricsGroup,
+      SocketServer.METRICS_GROUP,
       util.Map.of("ip", ip))
     metrics.metric(metricName)
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 431809129c9..ea69a599a15 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
 import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
-import org.apache.kafka.network.SocketServerConfigs
+import org.apache.kafka.network.{SocketServer => JSocketServer, 
SocketServerConfigs}
 import org.apache.kafka.server.DynamicThreadPool
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, 
ServerLogConfigs}
@@ -326,7 +326,7 @@ class DynamicBrokerConfigTest {
     config.dynamicConfig.removeReconfigurable(reconfigurable)
 
     val brokerReconfigurable = new BrokerReconfigurable {
-      override def reconfigurableConfigs: collection.Set[String] = 
Set(CleanerConfig.LOG_CLEANER_THREADS_PROP)
+      override def reconfigurableConfigs: util.Set[String] = 
util.Set.of(CleanerConfig.LOG_CLEANER_THREADS_PROP)
       override def validateReconfiguration(newConfig: KafkaConfig): Unit = 
validateLogCleanerConfig(newConfig.originals)
       override def reconfigure(oldConfig: KafkaConfig, newConfig: 
KafkaConfig): Unit = {}
     }
@@ -351,7 +351,7 @@ class DynamicBrokerConfigTest {
     
config.dynamicConfig.addReconfigurable(createReconfigurable(validReconfigurableProps))
 
     def createBrokerReconfigurable(configs: Set[String]) = new 
BrokerReconfigurable {
-      override def reconfigurableConfigs: collection.Set[String] = configs
+      override def reconfigurableConfigs: util.Set[String] = configs.asJava
       override def validateReconfiguration(newConfig: KafkaConfig): Unit = {}
       override def reconfigure(oldConfig: KafkaConfig, newConfig: 
KafkaConfig): Unit = {}
     }
@@ -510,7 +510,7 @@ class DynamicBrokerConfigTest {
     when(quotaManagers.clientQuotaCallbackPlugin).thenReturn(Optional.empty())
     when(kafkaServer.quotaManagers).thenReturn(quotaManagers)
     val socketServer: SocketServer = mock(classOf[SocketServer])
-    
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
+    
when(socketServer.reconfigurableConfigs).thenReturn(JSocketServer.RECONFIGURABLE_CONFIGS)
     when(kafkaServer.socketServer).thenReturn(socketServer)
     val logManager: LogManager = mock(classOf[LogManager])
     val producerStateManagerConfig: ProducerStateManagerConfig = 
mock(classOf[ProducerStateManagerConfig])
@@ -557,7 +557,7 @@ class DynamicBrokerConfigTest {
     when(quotaManagers.clientQuotaCallbackPlugin).thenReturn(Optional.empty())
     when(controllerServer.quotaManagers).thenReturn(quotaManagers)
     val socketServer: SocketServer = mock(classOf[SocketServer])
-    
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
+    
when(socketServer.reconfigurableConfigs).thenReturn(JSocketServer.RECONFIGURABLE_CONFIGS)
     when(controllerServer.socketServer).thenReturn(socketServer)
 
     val authorizer = new TestAuthorizer
@@ -603,7 +603,7 @@ class DynamicBrokerConfigTest {
     when(quotaManagers.clientQuotaCallbackPlugin).thenReturn(Optional.empty())
     when(controllerServer.quotaManagers).thenReturn(quotaManagers)
     val socketServer: SocketServer = mock(classOf[SocketServer])
-    
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
+    
when(socketServer.reconfigurableConfigs).thenReturn(JSocketServer.RECONFIGURABLE_CONFIGS)
     when(controllerServer.socketServer).thenReturn(socketServer)
 
     val authorizer = new TestAuthorizer
@@ -616,18 +616,6 @@ class DynamicBrokerConfigTest {
     assertEquals("User:admin", authorizer.superUsers)
   }
 
-  @Test
-  def testSynonyms(): Unit = {
-    assertEquals(List("listener.name.secure.ssl.keystore.type", 
"ssl.keystore.type"),
-      
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.secure.ssl.keystore.type",
 matchListenerOverride = true))
-    assertEquals(List("listener.name.sasl_ssl.plain.sasl.jaas.config", 
"sasl.jaas.config"),
-      
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.sasl_ssl.plain.sasl.jaas.config",
 matchListenerOverride = true))
-    assertEquals(List("some.config"),
-      DynamicBrokerConfig.brokerConfigSynonyms("some.config", 
matchListenerOverride = true))
-    assertEquals(List(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, 
ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG),
-      
DynamicBrokerConfig.brokerConfigSynonyms(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG,
 matchListenerOverride = true))
-  }
-
   @Test
   def testImproperConfigsAreRemoved(): Unit = {
     val props = TestUtils.createBrokerConfig(0)
@@ -1153,8 +1141,8 @@ class DynamicBrokerConfigTest {
 
 class TestDynamicThreadPool extends BrokerReconfigurable {
 
-  override def reconfigurableConfigs: Set[String] = {
-    DynamicThreadPool.RECONFIGURABLE_CONFIGS.asScala
+  override def reconfigurableConfigs: util.Set[String] = {
+    DynamicThreadPool.RECONFIGURABLE_CONFIGS
   }
 
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
diff --git a/server/src/main/java/org/apache/kafka/network/SocketServer.java 
b/server/src/main/java/org/apache/kafka/network/SocketServer.java
new file mode 100644
index 00000000000..582ba5c3cf6
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/network/SocketServer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+
+public class SocketServer {
+
+    public static final String METRICS_GROUP = "socket-server-metrics";
+
+    public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+            SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG,
+            SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
+            SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
+            SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG);
+
+    public static final Set<String> LISTENER_RECONFIGURABLE_CONFIGS = Set.of(
+            SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
+            SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG);
+
+    public static void closeSocket(SocketChannel channel) {
+        Utils.closeQuietly(channel.socket(), "channel socket");
+        Utils.closeQuietly(channel, "channel");
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
new file mode 100644
index 00000000000..a45c65a9b9b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.share.ShareCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.network.SocketServer;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.server.DynamicThreadPool;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.storage.internals.log.LogCleaner;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class DynamicBrokerConfig {
+
+    public static final Set<String> DYNAMIC_SECURITY_CONFIGS = 
SslConfigs.RECONFIGURABLE_CONFIGS;
+
+    private static final Set<String> DYNAMIC_PRODUCER_STATE_MANAGER_CONFIGS = 
Set.of(
+            TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,
+            
TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+
+    private static final Set<String> CLUSTER_LEVEL_LISTENER_CONFIGS = Set.of(
+            SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
+            SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
+            SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG);
+
+    private static final Set<String> PER_BROKER_CONFIGS = Stream.of(
+            DYNAMIC_SECURITY_CONFIGS,
+            DynamicListenerConfig.RECONFIGURABLE_CONFIGS)
+        .flatMap(Collection::stream)
+        .filter(c -> !CLUSTER_LEVEL_LISTENER_CONFIGS.contains(c))
+        .collect(Collectors.toUnmodifiableSet());
+
+    public static final Set<String> ALL_DYNAMIC_CONFIGS = Stream.of(
+            DYNAMIC_SECURITY_CONFIGS,
+            LogCleaner.RECONFIGURABLE_CONFIGS,
+            DynamicLogConfig.RECONFIGURABLE_CONFIGS,
+            DynamicThreadPool.RECONFIGURABLE_CONFIGS,
+            List.of(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG),
+            DynamicListenerConfig.RECONFIGURABLE_CONFIGS,
+            SocketServer.RECONFIGURABLE_CONFIGS,
+            DYNAMIC_PRODUCER_STATE_MANAGER_CONFIGS,
+            DynamicRemoteLogConfig.RECONFIGURABLE_CONFIGS,
+            DynamicReplicationConfig.RECONFIGURABLE_CONFIGS,
+            List.of(AbstractConfig.CONFIG_PROVIDERS_CONFIG),
+            GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS,
+            ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS)
+        .flatMap(Collection::stream)
+        .collect(Collectors.toUnmodifiableSet());
+
+    private static final Set<String> LISTENER_MECHANISM_CONFIGS = Set.of(
+            SaslConfigs.SASL_JAAS_CONFIG,
+            SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+            SaslConfigs.SASL_LOGIN_CLASS,
+            BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG,
+            BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG);
+
+    private static final Pattern LISTENER_CONFIG_REGEX = 
Pattern.compile("listener\\.name\\.[^.]*\\.(.*)");
+
+    public static List<String> brokerConfigSynonyms(String name, boolean 
matchListenerOverride) {
+        List<String> logRollConfigs = 
List.of(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, 
ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG);
+        List<String> logRollJitterConfigs = 
List.of(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, 
ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG);
+        List<String> logRetentionConfigs = 
List.of(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 
ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, 
ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG);
+        List<String> logFlushConfigs = 
List.of(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, 
ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG);
+        if (logRollConfigs.contains(name)) {
+            return logRollConfigs;
+        } else if (logRollJitterConfigs.contains(name)) {
+            return logRollJitterConfigs;
+        } else if (ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG.equals(name)) 
{ // KafkaLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG is used as default
+            return logFlushConfigs;
+        } else if (logRetentionConfigs.contains(name)) {
+            return logRetentionConfigs;
+        } else if (matchListenerOverride) {
+            Matcher matcher = LISTENER_CONFIG_REGEX.matcher(name);
+            if (matcher.matches()) {
+                String baseName = matcher.group(1);
+                // `ListenerMechanismConfigs` are specified as 
listenerPrefix.mechanism.<configName>
+                // and other listener configs are specified as 
listenerPrefix.<configName>
+                // Add <configName> as a synonym in both cases.
+                Optional<String> mechanismConfig = 
LISTENER_MECHANISM_CONFIGS.stream().filter(baseName::endsWith).findFirst();
+                return List.of(name, mechanismConfig.orElse(baseName));
+            }
+        }
+        return List.of(name);
+    }
+
+    private static void checkInvalidProps(Set<String> invalidPropNames, String 
errorMessage) {
+        if (!invalidPropNames.isEmpty()) {
+            throw new ConfigException(errorMessage + ": " + invalidPropNames);
+        }
+    }
+
+    public static void validateConfigs(Properties props, boolean 
perBrokerConfig) {
+        checkInvalidProps(nonDynamicConfigs(props), "Cannot update these 
configs dynamically");
+        checkInvalidProps(securityConfigsWithoutListenerPrefix(props),
+                "These security configs can be dynamically updated only 
per-listener using the listener prefix");
+        validateConfigTypes(props);
+        if (!perBrokerConfig) {
+            checkInvalidProps(perBrokerConfigs(props),
+                    "Cannot update these configs at default cluster level, 
broker id must be specified");
+        }
+    }
+
+    public static Set<String> securityConfigsWithoutListenerPrefix(Properties 
props) {
+        return 
DYNAMIC_SECURITY_CONFIGS.stream().filter(props::containsKey).collect(Collectors.toSet());
+    }
+
+    public static void validateConfigTypes(Properties props) {
+        Properties baseProps = new Properties();
+        props.forEach((name, value) -> {
+            Matcher matcher = LISTENER_CONFIG_REGEX.matcher((String) name);
+            if (matcher.matches()) {
+                String baseName = matcher.group(1);
+                baseProps.put(baseName, value);
+            } else {
+                baseProps.put(name, value);
+            }
+        });
+        DynamicConfig.Broker.validate(baseProps);
+    }
+
+    public static Set<String> perBrokerConfigs(Properties props) {
+        Set<String> configNames = props.stringPropertyNames();
+        Set<String> perBrokerConfigs = new HashSet<>();
+        for (String name : configNames) {
+            if (PER_BROKER_CONFIGS.contains(name)) {
+                perBrokerConfigs.add(name);
+            } else {
+                Matcher matcher = LISTENER_CONFIG_REGEX.matcher(name);
+                if (matcher.matches()) {
+                    String baseName = matcher.group(1);
+                    if (!CLUSTER_LEVEL_LISTENER_CONFIGS.contains(baseName)) {
+                        perBrokerConfigs.add(name);
+                    }
+                }
+            }
+        }
+        return perBrokerConfigs;
+    }
+
+    public static Set<String> nonDynamicConfigs(Properties props) {
+        Set<String> nonDynamicConfigs = new 
HashSet<>(props.stringPropertyNames());
+        nonDynamicConfigs.retainAll(DynamicConfig.Broker.nonDynamicProps());
+        return nonDynamicConfigs;
+    }
+
+    public static Properties resolveVariableConfigs(Properties propsOriginal) {
+        Properties props = new Properties();
+        AbstractConfig config = new AbstractConfig(new ConfigDef(), 
propsOriginal, Utils.castToStringObjectMap(propsOriginal), false);
+        config.originals().forEach((key, value) -> {
+            if (!key.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) {
+                props.put(key, value);
+            }
+        });
+        return props;
+    }
+
+    public static Map<String, String> dynamicConfigUpdateModes() {
+        return ALL_DYNAMIC_CONFIGS.stream().collect(Collectors.toMap(
+                Function.identity(),
+                name -> PER_BROKER_CONFIGS.contains(name) ? "per-broker" : 
"cluster-wide"
+            )
+        );
+    }
+
+    public static class DynamicLogConfig {
+        /**
+         * The broker configurations pertaining to logs that are 
reconfigurable. This set contains
+         * the names you would use when setting a static or dynamic broker 
configuration (not topic
+         * configuration).
+         */
+        public static final Set<String> RECONFIGURABLE_CONFIGS = Set.copyOf(
+                ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values());
+    }
+
+    public static class DynamicListenerConfig {
+        /**
+         * The set of configurations which the DynamicListenerConfig object 
listens for. Many of
+         * these are also monitored by other objects such as ChannelBuilders 
and SocketServers.
+         */
+        public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+                // Listener configs
+                SocketServerConfigs.LISTENERS_CONFIG,
+                SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
+
+                // SSL configs
+                BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
+                SslConfigs.SSL_PROTOCOL_CONFIG,
+                SslConfigs.SSL_PROVIDER_CONFIG,
+                SslConfigs.SSL_CIPHER_SUITES_CONFIG,
+                SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
+                SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
+                SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
+                SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
+                SslConfigs.SSL_KEY_PASSWORD_CONFIG,
+                SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
+                SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
+                SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
+                SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG,
+                SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
+                SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
+                SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG,
+                BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
+                SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG,
+
+                // SASL configs
+                
BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
+                SaslConfigs.SASL_JAAS_CONFIG,
+                BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
+                SaslConfigs.SASL_KERBEROS_SERVICE_NAME,
+                SaslConfigs.SASL_KERBEROS_KINIT_CMD,
+                SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR,
+                SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER,
+                SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN,
+                
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG,
+                SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR,
+                SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER,
+                SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS,
+                SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS,
+
+                // Connection limit configs
+                SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
+                SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
+
+                // Network threads
+                SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG);
+    }
+
+    public static class DynamicRemoteLogConfig {
+        public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+                
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
+                RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
+                
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
+                
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
+                
RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
+                
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
+                
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+                
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
+                RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP);
+    }
+
+    public static class DynamicReplicationConfig {
+        public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+                
ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/DynamicConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/DynamicConfig.java
new file mode 100644
index 00000000000..e1e7ccfd245
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/config/DynamicConfig.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Class used to hold dynamic configs. These are configs which have no 
physical manifestation in the server.properties
+ * and can only be set dynamically.
+ */
+public class DynamicConfig {
+
+    public static class Broker {
+
+        private static final ConfigDef BROKER_CONFIGS;
+        static {
+            ConfigDef configs = QuotaConfig.brokerQuotaConfigs();
+            // Filter and define all dynamic configurations
+            AbstractKafkaConfig.CONFIG_DEF.configKeys().forEach((name, value) 
-> {
+                if (DynamicBrokerConfig.ALL_DYNAMIC_CONFIGS.contains(name)) {
+                    configs.define(value);
+                }
+            });
+            BROKER_CONFIGS = configs;
+        }
+
+        // In order to avoid circular reference, all DynamicBrokerConfig's 
variables which are initialized by `DynamicConfig.Broker` should be moved to 
`DynamicConfig.Broker`.
+        // Otherwise, those variables of DynamicBrokerConfig will see 
intermediate state of `DynamicConfig.Broker`, because `BROKER_CONFIGS` is 
created by `DynamicBrokerConfig.ALL_DYNAMIC_CONFIGS`
+        public static Set<String> nonDynamicProps() {
+            Set<String> nonDynamicProps = new 
HashSet<>(AbstractKafkaConfig.CONFIG_DEF.names());
+            nonDynamicProps.removeAll(BROKER_CONFIGS.names());
+            return nonDynamicProps;
+        }
+
+        public static Map<String, ConfigDef.ConfigKey> configKeys() {
+            return BROKER_CONFIGS.configKeys();
+        }
+
+        public static Set<String> names() {
+            return BROKER_CONFIGS.names();
+        }
+
+        public static Map<String, Object> validate(Properties props) {
+            // Validate Names
+            Properties propResolved = 
DynamicBrokerConfig.resolveVariableConfigs(props);
+            // ValidateValues
+            return BROKER_CONFIGS.parse(propResolved);
+        }
+    }
+
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/config/DynamicBrokerConfigTest.java
 
b/server/src/test/java/org/apache/kafka/server/config/DynamicBrokerConfigTest.java
new file mode 100644
index 00000000000..1306185f7a9
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/config/DynamicBrokerConfigTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DynamicBrokerConfigTest {
+
+    @Test
+    public void testBrokerConfigSynonyms() {
+        List<String> logRollTimeConfigs = 
List.of(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, 
ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG);
+        for (String config : logRollTimeConfigs) {
+            assertEquals(logRollTimeConfigs, 
DynamicBrokerConfig.brokerConfigSynonyms(config, false));
+        }
+        List<String> logRollJitterConfigs = 
List.of(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, 
ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG);
+        for (String config : logRollJitterConfigs) {
+            assertEquals(logRollJitterConfigs, 
DynamicBrokerConfig.brokerConfigSynonyms(config, false));
+        }
+        List<String> logFlushConfigs = 
List.of(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, 
ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG);
+        assertEquals(logFlushConfigs, 
DynamicBrokerConfig.brokerConfigSynonyms(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG,
 false));
+        List<String> logRetentionConfigs = 
List.of(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 
ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, 
ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG);
+        for (String config : logRetentionConfigs) {
+            assertEquals(logRetentionConfigs, 
DynamicBrokerConfig.brokerConfigSynonyms(config, false));
+        }
+
+        assertEquals(List.of("listener.name.secure.ssl.keystore.type", 
"ssl.keystore.type"),
+                
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.secure.ssl.keystore.type",
 true));
+        assertEquals(List.of("listener.name.sasl_ssl.plain.sasl.jaas.config", 
"sasl.jaas.config"),
+                
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.sasl_ssl.plain.sasl.jaas.config",
 true));
+        assertEquals(List.of("some.config"),
+                DynamicBrokerConfig.brokerConfigSynonyms("some.config", true));
+
+        assertEquals(List.of("listener.name.NAME.CONFIG", "CONFIG"), 
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.NAME.CONFIG", true));
+        assertEquals(List.of("listener.name.NAME.CONFIG"), 
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.NAME.CONFIG", false));
+        assertEquals(List.of("listener.name.CONFIG"), 
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.CONFIG", true));
+        assertEquals(List.of("listener.name.CONFIG"), 
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.CONFIG", false));
+
+        assertEquals(List.of("anything"), 
DynamicBrokerConfig.brokerConfigSynonyms("anything", false));
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index 9800d9125eb..c221ba05ab1 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -101,7 +101,7 @@ public class ConfigCommandIntegrationTest {
             "--entity-type", "brokers",
             "--alter",
             "--add-config", "security.inter.broker.protocol=PLAINTEXT")),
-            errOut -> assertTrue(errOut.contains("Cannot update these configs 
dynamically: Set(security.inter.broker.protocol)"), errOut));
+            errOut -> assertTrue(errOut.contains("Cannot update these configs 
dynamically: [security.inter.broker.protocol]"), errOut));
     }
 
     @ClusterTest

Reply via email to