This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7438f100cf4 KAFKA-14774 the removed listeners should not be 
reconfigurable (#13326)
7438f100cf4 is described below

commit 7438f100cf409a1ca178b15b6b7bcfd99e541098
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Mon Mar 27 18:48:31 2023 +0800

    KAFKA-14774 the removed listeners should not be reconfigurable (#13326)
    
    Reviewers: Mickael Maison <[email protected]>
---
 .../main/scala/kafka/network/SocketServer.scala    |  1 +
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  2 +-
 .../server/DynamicBrokerReconfigurationTest.scala  | 34 ++++++++++++++++++++--
 3 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index feca5fc68b4..cdc8ece103c 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -352,6 +352,7 @@ class SocketServer(val config: KafkaConfig,
       dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
         acceptor.beginShutdown()
         acceptor.close()
+        config.removeReconfigurable(acceptor)
       }
     }
   }
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 940580d155b..7c6d5284d71 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -211,7 +211,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 
   // Use COWArrayList to prevent concurrent modification exception when an 
item is added by one thread to these
   // collections, while another thread is iterating over them.
-  private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
+  private[server] val reconfigurables = new 
CopyOnWriteArrayList[Reconfigurable]()
   private val brokerReconfigurables = new 
CopyOnWriteArrayList[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
   private var currentConfig: KafkaConfig = _
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 661295e280c..b08c5360d96 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -32,7 +32,7 @@ import kafka.admin.ConfigCommand
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
 import kafka.log.UnifiedLog
-import kafka.network.{Processor, RequestChannel}
+import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel}
 import kafka.utils._
 import kafka.utils.Implicits._
 import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
@@ -77,6 +77,7 @@ import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 
 object DynamicBrokerReconfigurationTest {
+  val Plain = "PLAIN"
   val SecureInternal = "INTERNAL"
   val SecureExternal = "EXTERNAL"
 }
@@ -125,7 +126,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
       // Ensure that we can support multiple listeners per security protocol 
and multiple security protocols
       props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, 
$SecureExternal://localhost:0")
-      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"$SecureInternal:SSL, $SecureExternal:SASL_SSL, 
CONTROLLER:$controllerListenerSecurityProtocol")
+      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"PLAINTEXT:PLAINTEXT, $SecureInternal:SSL, $SecureExternal:SASL_SSL, 
CONTROLLER:$controllerListenerSecurityProtocol")
       props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
       props.put(KafkaConfig.SslClientAuthProp, "requested")
       props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
@@ -1167,6 +1168,35 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     assertTrue(partitions.exists(_.leader == null), "Did not find partitions 
with no leader")
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testReconfigureRemovedListener(quorum: String): Unit = {
+    val client = adminClients.head
+    val broker = servers.head
+    assertEquals(2, 
broker.config.dynamicConfig.reconfigurables.asScala.count(r => 
r.isInstanceOf[DataPlaneAcceptor]))
+    val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, 
broker.config.brokerId.toString)
+
+    def acceptors: Seq[DataPlaneAcceptor] = 
broker.config.dynamicConfig.reconfigurables.asScala.filter(_.isInstanceOf[DataPlaneAcceptor])
+      .map(_.asInstanceOf[DataPlaneAcceptor]).toSeq
+
+    // add new PLAINTEXT listener
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(KafkaConfig.ListenersProp,
+        s"PLAINTEXT://localhost:0, $SecureInternal://localhost:0, 
$SecureExternal://localhost:0"), AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+
+    TestUtils.waitUntilTrue(() => acceptors.size == 3, s"failed to add new 
DataPlaneAcceptor")
+
+    // remove PLAINTEXT listener
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(KafkaConfig.ListenersProp,
+        s"$SecureInternal://localhost:0, $SecureExternal://localhost:0"), 
AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+
+    TestUtils.waitUntilTrue(() => acceptors.size == 2,
+      s"failed to remove DataPlaneAcceptor. current: 
${acceptors.map(_.endPoint.toString).mkString(",")}")
+  }
+
   private def addListener(servers: Seq[KafkaBroker], listenerName: String, 
securityProtocol: SecurityProtocol,
                           saslMechanisms: Seq[String]): Unit = {
     val config = servers.head.config

Reply via email to