This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 7aaa31c  MINOR: Brokers in KRaft don't need controller listener 
(#11511)
7aaa31c is described below

commit 7aaa31c405acda1e47c42eee12a7b51379ea8f81
Author: José Armando García Sancio <[email protected]>
AuthorDate: Fri Nov 19 10:28:19 2021 -0700

    MINOR: Brokers in KRaft don't need controller listener (#11511)
    
    The KRaft brokers should not list the names in `controller.listener.names` 
in `listeners` because brokers do not bind to those endpoints. This commit also 
removes the extra changes to the security protocol map because the `PLAINTEXT` 
protocol doesn't require additional configuration.
    
    To fully support all of the security protocol configuration additional 
changes to `QuorumTestHarness` are needed. Those changes can be made when 
migrating integration tests that need this functionality.
    
    Reviewers: Ron Dagostino <[email protected]>, Jason Gustafson 
<[email protected]>
    (cherry picked from commit 074a03cca162f91ccdecc12eb84c6a45af75f6bf)
---
 .../kafka/api/IntegrationTestHarness.scala         | 22 ----------------------
 .../kafka/server/QuorumTestHarness.scala           |  5 ++---
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  2 +-
 3 files changed, 3 insertions(+), 26 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 8989044..0f987e9 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -62,7 +62,6 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, 
logDirCount = logDirCount)
     configureListeners(cfgs)
     modifyConfigs(cfgs)
-    insertControllerListenersIfNeeded(cfgs)
     cfgs.map(KafkaConfig.fromProps)
   }
 
@@ -81,27 +80,6 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
     }
   }
 
-  private def insertControllerListenersIfNeeded(props: Seq[Properties]): Unit 
= {
-    if (isKRaftTest()) {
-      props.foreach { config =>
-        // Add the CONTROLLER listener to "listeners" if it is not already 
there.
-        // But do not add it to advertised.listeners.
-        val listeners = config.getProperty(KafkaConfig.ListenersProp, 
"").split(",")
-        val listenerNames = listeners.map(_.replaceFirst(":.*", ""))
-        if (!listenerNames.contains("CONTROLLER")) {
-          config.setProperty(KafkaConfig.ListenersProp,
-            (listeners ++ Seq("CONTROLLER://localhost:0")).mkString(","))
-        }
-        // Add a security protocol for the CONTROLLER endpoint, if one is not 
already set.
-        val securityPairs = 
config.getProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "").split(",")
-        if (!securityPairs.exists(_.startsWith("CONTROLLER:"))) {
-          config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
-            (securityPairs ++ 
Seq(s"CONTROLLER:${controllerListenerSecurityProtocol.toString}")).mkString(","))
-        }
-      }
-    }
-  }
-
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     doSetup(testInfo, createOffsetsTopic = true)
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 69be4f5..9c9fb72 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -103,10 +103,9 @@ abstract class QuorumTestHarness extends Logging {
   protected def zkAclsEnabled: Option[Boolean] = None
 
   /**
-   * When in KRaft mode, the security protocol to use for the controller 
listener.
-   * Can be overridden by subclasses.
+   * When in KRaft mode, this test harness only support PLAINTEXT
    */
-  protected def controllerListenerSecurityProtocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT
+  private val controllerListenerSecurityProtocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT
 
   protected def kraftControllerConfigs(): Seq[Properties] = {
     Seq(new Properties())
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 407f88d..99d7639 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -297,7 +297,7 @@ object TestUtils extends Logging {
       props.put(KafkaConfig.NodeIdProp, nodeId.toString)
       props.put(KafkaConfig.BrokerIdProp, nodeId.toString)
       props.put(KafkaConfig.AdvertisedListenersProp, listeners)
-      props.put(KafkaConfig.ListenersProp, listeners + 
",CONTROLLER://localhost:0")
+      props.put(KafkaConfig.ListenersProp, listeners)
       props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
       props.put(KafkaConfig.ListenerSecurityProtocolMapProp, protocolAndPorts.
         map(p => "%s:%s".format(p._1, p._1)).mkString(",") + 
",CONTROLLER:PLAINTEXT")

Reply via email to