This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7438f100cf4 KAFKA-14774 the removed listeners should not be
reconfigurable (#13326)
7438f100cf4 is described below
commit 7438f100cf409a1ca178b15b6b7bcfd99e541098
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Mon Mar 27 18:48:31 2023 +0800
KAFKA-14774 the removed listeners should not be reconfigurable (#13326)
Reviewers: Mickael Maison <[email protected]>
---
.../main/scala/kafka/network/SocketServer.scala | 1 +
.../scala/kafka/server/DynamicBrokerConfig.scala | 2 +-
.../server/DynamicBrokerReconfigurationTest.scala | 34 ++++++++++++++++++++--
3 files changed, 34 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index feca5fc68b4..cdc8ece103c 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -352,6 +352,7 @@ class SocketServer(val config: KafkaConfig,
dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
acceptor.beginShutdown()
acceptor.close()
+ config.removeReconfigurable(acceptor)
}
}
}
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 940580d155b..7c6d5284d71 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -211,7 +211,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
// Use COWArrayList to prevent concurrent modification exception when an
item is added by one thread to these
// collections, while another thread is iterating over them.
- private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
+ private[server] val reconfigurables = new
CopyOnWriteArrayList[Reconfigurable]()
private val brokerReconfigurables = new
CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig: KafkaConfig = _
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 661295e280c..b08c5360d96 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -32,7 +32,7 @@ import kafka.admin.ConfigCommand
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
import kafka.log.UnifiedLog
-import kafka.network.{Processor, RequestChannel}
+import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel}
import kafka.utils._
import kafka.utils.Implicits._
import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
@@ -77,6 +77,7 @@ import scala.jdk.CollectionConverters._
import scala.collection.Seq
object DynamicBrokerReconfigurationTest {
+ val Plain = "PLAIN"
val SecureInternal = "INTERNAL"
val SecureExternal = "EXTERNAL"
}
@@ -125,7 +126,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
// Ensure that we can support multiple listeners per security protocol
and multiple security protocols
props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0,
$SecureExternal://localhost:0")
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp,
s"$SecureInternal:SSL, $SecureExternal:SASL_SSL,
CONTROLLER:$controllerListenerSecurityProtocol")
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp,
s"PLAINTEXT:PLAINTEXT, $SecureInternal:SSL, $SecureExternal:SASL_SSL,
CONTROLLER:$controllerListenerSecurityProtocol")
props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
props.put(KafkaConfig.SslClientAuthProp, "requested")
props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
@@ -1167,6 +1168,35 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
assertTrue(partitions.exists(_.leader == null), "Did not find partitions
with no leader")
}
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testReconfigureRemovedListener(quorum: String): Unit = {
+ val client = adminClients.head
+ val broker = servers.head
+ assertEquals(2,
broker.config.dynamicConfig.reconfigurables.asScala.count(r =>
r.isInstanceOf[DataPlaneAcceptor]))
+ val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER,
broker.config.brokerId.toString)
+
+ def acceptors: Seq[DataPlaneAcceptor] =
broker.config.dynamicConfig.reconfigurables.asScala.filter(_.isInstanceOf[DataPlaneAcceptor])
+ .map(_.asInstanceOf[DataPlaneAcceptor]).toSeq
+
+ // add new PLAINTEXT listener
+ client.incrementalAlterConfigs(Map(broker0Resource ->
+ Seq(new AlterConfigOp(new ConfigEntry(KafkaConfig.ListenersProp,
+ s"PLAINTEXT://localhost:0, $SecureInternal://localhost:0,
$SecureExternal://localhost:0"), AlterConfigOp.OpType.SET)
+ ).asJavaCollection).asJava).all().get()
+
+ TestUtils.waitUntilTrue(() => acceptors.size == 3, s"failed to add new
DataPlaneAcceptor")
+
+ // remove PLAINTEXT listener
+ client.incrementalAlterConfigs(Map(broker0Resource ->
+ Seq(new AlterConfigOp(new ConfigEntry(KafkaConfig.ListenersProp,
+ s"$SecureInternal://localhost:0, $SecureExternal://localhost:0"),
AlterConfigOp.OpType.SET)
+ ).asJavaCollection).asJava).all().get()
+
+ TestUtils.waitUntilTrue(() => acceptors.size == 2,
+ s"failed to remove DataPlaneAcceptor. current:
${acceptors.map(_.endPoint.toString).mkString(",")}")
+ }
+
private def addListener(servers: Seq[KafkaBroker], listenerName: String,
securityProtocol: SecurityProtocol,
saslMechanisms: Seq[String]): Unit = {
val config = servers.head.config