This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 7aaa31c MINOR: Brokers in KRaft don't need controller listener
(#11511)
7aaa31c is described below
commit 7aaa31c405acda1e47c42eee12a7b51379ea8f81
Author: José Armando García Sancio <[email protected]>
AuthorDate: Fri Nov 19 10:28:19 2021 -0700
MINOR: Brokers in KRaft don't need controller listener (#11511)
The KRaft brokers should not list the names in `controller.listener.names`
in `listeners` because brokers do not bind to those endpoints. This commit also
removes the extra changes to the security protocol map because the `PLAINTEXT`
protocol doesn't require additional configuration.
To fully support all of the security protocol configuration additional
changes to `QuorumTestHarness` are needed. Those changes can be made when
migrating integration tests that need this functionality.
Reviewers: Ron Dagostino <[email protected]>, Jason Gustafson
<[email protected]>
(cherry picked from commit 074a03cca162f91ccdecc12eb84c6a45af75f6bf)
---
.../kafka/api/IntegrationTestHarness.scala | 22 ----------------------
.../kafka/server/QuorumTestHarness.scala | 5 ++---
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
3 files changed, 3 insertions(+), 26 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 8989044..0f987e9 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -62,7 +62,6 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties,
logDirCount = logDirCount)
configureListeners(cfgs)
modifyConfigs(cfgs)
- insertControllerListenersIfNeeded(cfgs)
cfgs.map(KafkaConfig.fromProps)
}
@@ -81,27 +80,6 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
}
}
- private def insertControllerListenersIfNeeded(props: Seq[Properties]): Unit
= {
- if (isKRaftTest()) {
- props.foreach { config =>
- // Add the CONTROLLER listener to "listeners" if it is not already
there.
- // But do not add it to advertised.listeners.
- val listeners = config.getProperty(KafkaConfig.ListenersProp,
"").split(",")
- val listenerNames = listeners.map(_.replaceFirst(":.*", ""))
- if (!listenerNames.contains("CONTROLLER")) {
- config.setProperty(KafkaConfig.ListenersProp,
- (listeners ++ Seq("CONTROLLER://localhost:0")).mkString(","))
- }
- // Add a security protocol for the CONTROLLER endpoint, if one is not
already set.
- val securityPairs =
config.getProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "").split(",")
- if (!securityPairs.exists(_.startsWith("CONTROLLER:"))) {
- config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
- (securityPairs ++
Seq(s"CONTROLLER:${controllerListenerSecurityProtocol.toString}")).mkString(","))
- }
- }
- }
- }
-
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
doSetup(testInfo, createOffsetsTopic = true)
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 69be4f5..9c9fb72 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -103,10 +103,9 @@ abstract class QuorumTestHarness extends Logging {
protected def zkAclsEnabled: Option[Boolean] = None
/**
- * When in KRaft mode, the security protocol to use for the controller
listener.
- * Can be overridden by subclasses.
+ * When in KRaft mode, this test harness only support PLAINTEXT
*/
- protected def controllerListenerSecurityProtocol: SecurityProtocol =
SecurityProtocol.PLAINTEXT
+ private val controllerListenerSecurityProtocol: SecurityProtocol =
SecurityProtocol.PLAINTEXT
protected def kraftControllerConfigs(): Seq[Properties] = {
Seq(new Properties())
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 407f88d..99d7639 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -297,7 +297,7 @@ object TestUtils extends Logging {
props.put(KafkaConfig.NodeIdProp, nodeId.toString)
props.put(KafkaConfig.BrokerIdProp, nodeId.toString)
props.put(KafkaConfig.AdvertisedListenersProp, listeners)
- props.put(KafkaConfig.ListenersProp, listeners +
",CONTROLLER://localhost:0")
+ props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, protocolAndPorts.
map(p => "%s:%s".format(p._1, p._1)).mkString(",") +
",CONTROLLER:PLAINTEXT")