This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c40e7a13414 KAFKA-18533 Remove KafkaConfig zookeeper related logic 
(#18547)
c40e7a13414 is described below

commit c40e7a13414dd1ae9941d9be5d92f3452d971fa9
Author: Ken Huang <[email protected]>
AuthorDate: Sat Jan 25 22:52:21 2025 +0800

    KAFKA-18533 Remove KafkaConfig zookeeper related logic (#18547)
    
    Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala | 17 +++---------
 core/src/main/scala/kafka/server/Server.scala      |  9 +------
 .../api/DescribeAuthorizedOperationsTest.scala     |  3 +--
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  6 ++---
 .../kafka/api/SaslMultiMechanismConsumerTest.scala |  2 --
 .../kafka/api/SaslSslConsumerTest.scala            |  2 --
 .../kafka/server/DynamicBrokerConfigTest.scala     |  4 +--
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 30 ++++++++++------------
 .../kafka/server/KafkaMetricsReporterTest.scala    |  3 ---
 .../test/scala/unit/kafka/server/ServerTest.scala  |  2 +-
 .../org/apache/kafka/server/config/ZkConfigs.java  | 25 ------------------
 11 files changed, 25 insertions(+), 78 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index e243d40bbb2..6aea088d8c6 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
 import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.config.{AbstractKafkaConfig, 
DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs, ZkConfigs}
+import org.apache.kafka.server.config.{AbstractKafkaConfig, 
DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.MetricConfigs
 import org.apache.kafka.server.util.Csv
@@ -188,14 +188,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   def valuesFromThisConfigWithPrefixOverride(prefix: String): util.Map[String, 
AnyRef] =
     super.valuesWithPrefixOverride(prefix)
 
-  /** ********* Zookeeper Configuration ***********/
-  val zkConnect: String = getString(ZkConfigs.ZK_CONNECT_CONFIG)
-  val zkSessionTimeoutMs: Int = getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG)
-  val zkConnectionTimeoutMs: Int =
-    
Option(getInt(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG)).map(_.toInt).getOrElse(getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG))
-  val zkEnableSecureAcls: Boolean = 
getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG)
-  val zkMaxInFlightRequests: Int = 
getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG)
-
   private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
   def remoteLogManagerConfig = _remoteLogManagerConfig
 
@@ -231,9 +223,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   val controllerPerformanceSamplePeriodMs: Long = 
getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS)
   val controllerPerformanceAlwaysLogThresholdMs: Long = 
getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS)
 
-  def requiresZookeeper: Boolean = processRoles.isEmpty
-  def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
-
   private def parseProcessRoles(): Set[ProcessRole] = {
     val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
       case "broker" => ProcessRole.BrokerRole
@@ -610,7 +599,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       .map { case (listenerName, protocolName) =>
         ListenerName.normalised(listenerName) -> 
getSecurityProtocol(protocolName, 
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)
       }
-    if (usesSelfManagedQuorum && 
!originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
 {
+    if 
(!originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
 {
       // Nothing was specified explicitly for listener.security.protocol.map, 
so we are using the default value,
       // and we are using KRaft.
       // Add PLAINTEXT mappings for controller listeners as long as there is 
no SSL or SASL_{PLAINTEXT,SSL} in use
@@ -734,7 +723,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
 
     val listenerNames = listeners.map(_.listenerName).toSet
     if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) 
{
-      // validations for all broker setups (i.e. ZooKeeper and KRaft 
broker-only and KRaft co-located)
+      // validations for all broker setups (i.e. broker-only and co-located)
       validateAdvertisedBrokerListenersNonEmptyForBroker()
       require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
         s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a 
listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
diff --git a/core/src/main/scala/kafka/server/Server.scala 
b/core/src/main/scala/kafka/server/Server.scala
index d85060cc72d..b2b2e21898a 100644
--- a/core/src/main/scala/kafka/server/Server.scala
+++ b/core/src/main/scala/kafka/server/Server.scala
@@ -33,7 +33,6 @@ trait Server {
 object Server {
   val MetricsPrefix: String = "kafka.server"
   val ClusterIdLabel: String = "kafka.cluster.id"
-  val BrokerIdLabel: String = "kafka.broker.id"
   val NodeIdLabel: String = "kafka.node.id"
 
   def initializeMetrics(
@@ -69,13 +68,7 @@ object Server {
   ): KafkaMetricsContext = {
     val contextLabels = new java.util.HashMap[String, Object]
     contextLabels.put(ClusterIdLabel, clusterId)
-
-    if (config.usesSelfManagedQuorum) {
-      contextLabels.put(NodeIdLabel, config.nodeId.toString)
-    } else {
-      contextLabels.put(BrokerIdLabel, config.brokerId.toString)
-    }
-
+    contextLabels.put(NodeIdLabel, config.nodeId.toString)
     
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
     new KafkaMetricsContext(MetricsPrefix, contextLabels)
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
 
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index 0f23b93e31c..c7426c0d78e 100644
--- 
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.metadata.authorizer.StandardAuthorizer
 import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs}
+import org.apache.kafka.server.config.ServerConfigs
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
 
@@ -79,7 +79,6 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
   import DescribeAuthorizedOperationsTest._
 
   override val brokerCount = 1
-  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
   this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, 
classOf[StandardAuthorizer].getName)
 
   var client: Admin = _
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index b4e9d984129..e54b24cc08a 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -53,7 +53,7 @@ import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, 
ServerLogConfigs, ZkConfigs}
+import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, 
ServerLogConfigs}
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogFileUtils}
 import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, 
assertFutureThrows}
 import org.apache.logging.log4j.core.config.Configurator
@@ -4082,7 +4082,7 @@ object PlaintextAdminIntegrationTest {
       new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, 
"lz4"), OpType.SET)
     ))
     alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"), OpType.SET)))
-    alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET)))
+    alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"EXTERNAL://localhost:0,INTERNAL://localhost:0"), OpType.SET)))
     var alterResult = admin.incrementalAlterConfigs(alterConfigs)
 
     assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, 
alterResult.values.keySet)
@@ -4111,7 +4111,7 @@ object PlaintextAdminIntegrationTest {
       new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, 
"lz4"), OpType.SET)
     ))
     alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"), OpType.SET)))
-    alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET)))
+    alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new 
ConfigEntry(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"EXTERNAL://localhost:0,INTERNAL://localhost:0"), OpType.SET)))
     alterResult = admin.incrementalAlterConfigs(alterConfigs, new 
AlterConfigsOptions().validateOnly(true))
 
     assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, 
alterResult.values.keySet)
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index 30a33c2ab64..b41ccb6316c 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -15,7 +15,6 @@ package kafka.api
 import kafka.security.JaasTestUtils
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
@@ -26,7 +25,6 @@ import scala.jdk.CollectionConverters._
 class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
   private val kafkaClientSaslMechanism = "PLAIN"
   private val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN")
-  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
   override protected val serverSaslProperties = 
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism))
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
index 460ebe2cb4e..22c3077f4f9 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
@@ -15,12 +15,10 @@ package kafka.api
 import kafka.security.JaasTestUtils
 import kafka.utils.TestUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 
 @Timeout(600)
 class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
-  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
 
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 10b42f96b4e..3e77c76d9b4 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.server.authorizer._
-import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs, ZkConfigs}
+import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
 import org.apache.kafka.server.util.KafkaScheduler
@@ -253,7 +253,7 @@ class DynamicBrokerConfigTest {
 
     val securityPropsWithoutListenerPrefix = 
Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
     verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, 
securityPropsWithoutListenerPrefix)
-    val nonDynamicProps = Map(ZkConfigs.ZK_CONNECT_CONFIG -> "somehost:2181")
+    val nonDynamicProps = Map(KRaftConfigs.NODE_ID_CONFIG -> "123")
     verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, 
nonDynamicProps)
 
     // Test update of configs with invalid type
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 31c192ff9ef..f937ea6c81d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -49,6 +49,17 @@ import org.junit.jupiter.api.function.Executable
 import scala.jdk.CollectionConverters._
 
 class KafkaConfigTest {
+  
+  def createDefaultConfig(): Properties = {
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:5000")
+    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL")
+    props
+  }
 
   @Test
   def testLogRetentionTimeHoursProvided(): Unit = {
@@ -547,9 +558,7 @@ class KafkaConfigTest {
 
   @Test
   def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
-    val props = new Properties()
-    props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    val props = createDefaultConfig()
 
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"SSL://localhost:9091,REPLICATION://localhost:9092")
     props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, 
"SSL")
@@ -558,9 +567,7 @@ class KafkaConfigTest {
 
   @Test
   def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): 
Unit = {
-    val props = new Properties()
-    props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    val props = createDefaultConfig()
 
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"SSL://localhost:9091")
     props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, 
"REPLICATION")
@@ -569,9 +576,7 @@ class KafkaConfigTest {
 
   @Test
   def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = {
-    val props = new Properties()
-    props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    val props = createDefaultConfig()
 
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"SSL://localhost:9091")
     props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, 
"SSL")
@@ -794,11 +799,6 @@ class KafkaConfigTest {
 
     KafkaConfig.configNames.foreach { name =>
       name match {
-        case ZkConfigs.ZK_CONNECT_CONFIG => // ignore string
-        case ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
-        case ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
-        case ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
-        case ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
         case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
         case ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG =>  //ignore string
         case ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG =>  //ignore string
@@ -1181,7 +1181,6 @@ class KafkaConfigTest {
     defaults.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"CONTROLLER://localhost:9092")
     defaults.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
     // For ZkConnectionTimeoutMs
-    defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, "1234")
     defaults.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, 
"false")
     defaults.setProperty(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG, "1")
     defaults.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
@@ -1198,7 +1197,6 @@ class KafkaConfigTest {
     defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, 
Sensor.RecordingLevel.DEBUG.toString)
 
     val config = KafkaConfig.fromProps(defaults)
-    assertEquals(1234, config.zkConnectionTimeoutMs)
     assertEquals(false, config.brokerIdGenerationEnable)
     assertEquals(1, config.maxReservedBrokerId)
     assertEquals(1, config.brokerId)
diff --git 
a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index e07ae3032ca..f7e729740a7 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -45,7 +45,6 @@ object KafkaMetricsReporterTest {
 
       MockMetricsReporter.JMXPREFIX.set(contextLabelOrNull("_namespace", 
metricsContext))
       MockMetricsReporter.CLUSTERID.set(contextLabelOrNull("kafka.cluster.id", 
metricsContext))
-      MockMetricsReporter.BROKERID.set(contextLabelOrNull("kafka.broker.id", 
metricsContext))
       MockMetricsReporter.NODEID.set(contextLabelOrNull("kafka.node.id", 
metricsContext))
     }
 
@@ -58,7 +57,6 @@ object KafkaMetricsReporterTest {
 
   object MockMetricsReporter {
     val JMXPREFIX: AtomicReference[String] = new AtomicReference[String]
-    val BROKERID : AtomicReference[String] = new AtomicReference[String]
     val NODEID : AtomicReference[String] = new AtomicReference[String]
     val CLUSTERID : AtomicReference[String] = new AtomicReference[String]
   }
@@ -84,7 +82,6 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
   @ValueSource(strings = Array("kraft"))
   def testMetricsContextNamespacePresent(quorum: String): Unit = {
     assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get())
-    assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
     assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
     assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())
 
diff --git a/core/src/test/scala/unit/kafka/server/ServerTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerTest.scala
index 4b2b900b375..5b60d3e08f8 100644
--- a/core/src/test/scala/unit/kafka/server/ServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerTest.scala
@@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._
 class ServerTest {
 
   @Test
-  def testCreateSelfManagedKafkaMetricsContext(): Unit = {
+  def testCreateKafkaMetricsContext(): Unit = {
     val nodeId = 0
     val clusterId = Uuid.randomUuid().toString
 
diff --git a/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java 
b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java
index 0fd251edd16..b3b1b06911c 100644
--- a/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java
+++ b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java
@@ -22,23 +22,15 @@ import org.apache.kafka.common.config.ConfigDef;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
 import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
-import static org.apache.kafka.common.config.ConfigDef.Type.INT;
 import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
 import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
 import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
 
 public final class ZkConfigs {
     /** ********* Zookeeper Configuration ***********/
-    public static final String ZK_CONNECT_CONFIG = "zookeeper.connect";
-    public static final String ZK_SESSION_TIMEOUT_MS_CONFIG = 
"zookeeper.session.timeout.ms";
-    public static final String ZK_CONNECTION_TIMEOUT_MS_CONFIG = 
"zookeeper.connection.timeout.ms";
-    public static final String ZK_ENABLE_SECURE_ACLS_CONFIG = 
"zookeeper.set.acl";
-    public static final String ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG = 
"zookeeper.max.in.flight.requests";
     public static final String ZK_SSL_CLIENT_ENABLE_CONFIG = 
"zookeeper.ssl.client.enable";
     public static final String ZK_CLIENT_CNXN_SOCKET_CONFIG = 
"zookeeper.clientCnxnSocket";
     public static final String ZK_SSL_KEY_STORE_LOCATION_CONFIG = 
"zookeeper.ssl.keystore.location";
@@ -54,15 +46,6 @@ public final class ZkConfigs {
     public static final String ZK_SSL_CRL_ENABLE_CONFIG = 
"zookeeper.ssl.crl.enable";
     public static final String ZK_SSL_OCSP_ENABLE_CONFIG = 
"zookeeper.ssl.ocsp.enable";
 
-    public static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper 
connection string in the form <code>hostname:port</code> where host and port 
are the " +
-        "host and port of a ZooKeeper server. To allow connecting through 
other ZooKeeper nodes when that ZooKeeper machine is " +
-        "down you can also specify multiple hosts in the form 
<code>hostname1:port1,hostname2:port2,hostname3:port3</code>.\n" +
-        "The server can also have a ZooKeeper chroot path as part of its 
ZooKeeper connection string which puts its data under some path in the global 
ZooKeeper namespace. " +
-        "For example to give a chroot path of <code>/chroot/path</code> you 
would give the connection string as 
<code>hostname1:port1,hostname2:port2,hostname3:port3/chroot/path</code>.";
-    public static final String ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session 
timeout";
-    public static final String ZK_CONNECTION_TIMEOUT_MS_DOC = "The max time 
that the client waits to establish a connection to ZooKeeper. If not set, the 
value in " + ZK_SESSION_TIMEOUT_MS_CONFIG + " is used";
-    public static final String ZK_ENABLE_SECURE_ACLS_DOC = "Set client to use 
secure ACLs";
-    public static final String ZK_MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum 
number of unacknowledged requests the client will send to ZooKeeper before 
blocking.";
     public static final String ZK_SSL_CLIENT_ENABLE_DOC;
     public static final String ZK_CLIENT_CNXN_SOCKET_DOC;
     public static final String ZK_SSL_KEY_STORE_LOCATION_DOC;
@@ -81,9 +64,6 @@ public final class ZkConfigs {
     // a map from the Kafka config to the corresponding ZooKeeper Java system 
property
     public static final Map<String, String> 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP;
 
-    public static final int ZK_SESSION_TIMEOUT_MS = 18000;
-    public static final boolean ZK_ENABLE_SECURE_ACLS = false;
-    public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10;
     public static final boolean ZK_SSL_CLIENT_ENABLE = false;
     public static final String ZK_SSL_PROTOCOL = "TLSv1.2";
     public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = 
"HTTPS";
@@ -152,11 +132,6 @@ public final class ZkConfigs {
     }
 
     public static final ConfigDef CONFIG_DEF =  new ConfigDef()
-            .define(ZK_CONNECT_CONFIG, STRING, null, HIGH, ZK_CONNECT_DOC)
-            .define(ZK_SESSION_TIMEOUT_MS_CONFIG, INT, ZK_SESSION_TIMEOUT_MS, 
HIGH, ZK_SESSION_TIMEOUT_MS_DOC)
-            .define(ZK_CONNECTION_TIMEOUT_MS_CONFIG, INT, null, HIGH, 
ZK_CONNECTION_TIMEOUT_MS_DOC)
-            .define(ZK_ENABLE_SECURE_ACLS_CONFIG, BOOLEAN, 
ZK_ENABLE_SECURE_ACLS, HIGH, ZK_ENABLE_SECURE_ACLS_DOC)
-            .define(ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG, INT, 
ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZK_MAX_IN_FLIGHT_REQUESTS_DOC)
             .define(ZK_SSL_CLIENT_ENABLE_CONFIG, BOOLEAN, 
ZK_SSL_CLIENT_ENABLE, MEDIUM, ZK_SSL_CLIENT_ENABLE_DOC)
             .define(ZK_CLIENT_CNXN_SOCKET_CONFIG, STRING, null, MEDIUM, 
ZK_CLIENT_CNXN_SOCKET_DOC)
             .define(ZK_SSL_KEY_STORE_LOCATION_CONFIG, STRING, null, MEDIUM, 
ZK_SSL_KEY_STORE_LOCATION_DOC)

Reply via email to