This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 255bf5a1d37 KAFKA-14774 the removed listeners should not be
reconfigurable (#13472)
255bf5a1d37 is described below
commit 255bf5a1d37ad404c9fe8ffd429c1331b9f60b6d
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Wed Mar 29 22:08:00 2023 +0800
KAFKA-14774 the removed listeners should not be reconfigurable (#13472)
Reviewers: Luke Chen <[email protected]>
---
.../main/scala/kafka/network/SocketServer.scala | 1 +
.../scala/kafka/server/DynamicBrokerConfig.scala | 2 +-
.../server/DynamicBrokerReconfigurationTest.scala | 34 ++++++++++++++++++++--
3 files changed, 34 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index 0c08d7b056a..f5b7e689d50 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -323,6 +323,7 @@ class SocketServer(val config: KafkaConfig,
dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
acceptor.beginShutdown()
acceptor.close()
+ config.removeReconfigurable(acceptor)
}
}
}
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 38f7722d343..e66b075b5c4 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -208,7 +208,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
// Use COWArrayList to prevent concurrent modification exception when an
item is added by one thread to these
// collections, while another thread is iterating over them.
- private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
+ private[server] val reconfigurables = new
CopyOnWriteArrayList[Reconfigurable]()
private val brokerReconfigurables = new
CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig: KafkaConfig = _
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 92a6e457544..4f01432e167 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -33,7 +33,7 @@ import kafka.api.{KafkaSasl, SaslSetup}
import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
import kafka.log.{CleanerConfig, LogConfig, UnifiedLog}
import kafka.message.ProducerCompressionCodec
-import kafka.network.{Processor, RequestChannel}
+import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel}
import kafka.utils._
import kafka.utils.Implicits._
import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
@@ -75,6 +75,7 @@ import scala.jdk.CollectionConverters._
import scala.collection.Seq
object DynamicBrokerReconfigurationTest {
+ val Plain = "PLAIN"
val SecureInternal = "INTERNAL"
val SecureExternal = "EXTERNAL"
}
@@ -123,7 +124,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
// Ensure that we can support multiple listeners per security protocol
and multiple security protocols
props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0,
$SecureExternal://localhost:0")
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp,
s"$SecureInternal:SSL, $SecureExternal:SASL_SSL,
CONTROLLER:$controllerListenerSecurityProtocol")
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp,
s"PLAINTEXT:PLAINTEXT, $SecureInternal:SSL, $SecureExternal:SASL_SSL,
CONTROLLER:$controllerListenerSecurityProtocol")
props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
props.put(KafkaConfig.SslClientAuthProp, "requested")
props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
@@ -1165,6 +1166,35 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
assertTrue(partitions.exists(_.leader == null), "Did not find partitions
with no leader")
}
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testReconfigureRemovedListener(quorum: String): Unit = {
+ val client = adminClients.head
+ val broker = servers.head
+ assertEquals(2,
broker.config.dynamicConfig.reconfigurables.asScala.count(r =>
r.isInstanceOf[DataPlaneAcceptor]))
+ val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER,
broker.config.brokerId.toString)
+
+ def acceptors: Seq[DataPlaneAcceptor] =
broker.config.dynamicConfig.reconfigurables.asScala.filter(_.isInstanceOf[DataPlaneAcceptor])
+ .map(_.asInstanceOf[DataPlaneAcceptor]).toSeq
+
+ // add new PLAINTEXT listener
+ client.incrementalAlterConfigs(Map(broker0Resource ->
+ Seq(new AlterConfigOp(new ConfigEntry(KafkaConfig.ListenersProp,
+ s"PLAINTEXT://localhost:0, $SecureInternal://localhost:0,
$SecureExternal://localhost:0"), AlterConfigOp.OpType.SET)
+ ).asJavaCollection).asJava).all().get()
+
+ TestUtils.waitUntilTrue(() => acceptors.size == 3, s"failed to add new
DataPlaneAcceptor")
+
+ // remove PLAINTEXT listener
+ client.incrementalAlterConfigs(Map(broker0Resource ->
+ Seq(new AlterConfigOp(new ConfigEntry(KafkaConfig.ListenersProp,
+ s"$SecureInternal://localhost:0, $SecureExternal://localhost:0"),
AlterConfigOp.OpType.SET)
+ ).asJavaCollection).asJava).all().get()
+
+ TestUtils.waitUntilTrue(() => acceptors.size == 2,
+ s"failed to remove DataPlaneAcceptor. current:
${acceptors.map(_.endPoint.toString).mkString(",")}")
+ }
+
private def addListener(servers: Seq[KafkaBroker], listenerName: String,
securityProtocol: SecurityProtocol,
saslMechanisms: Seq[String]): Unit = {
val config = servers.head.config