This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 73a54c5  KAFKA-13003: In kraft mode also advertise configured 
advertised port instead of socket port (#10935)
73a54c5 is described below

commit 73a54c5265231b3d764391b70572626a876f1e27
Author: Uwe Eisele <[email protected]>
AuthorDate: Mon Jul 12 22:40:09 2021 +0200

    KAFKA-13003: In kraft mode also advertise configured advertised port 
instead of socket port (#10935)
    
    In Kraft mode, Apache Kafka 2.8.0 advertises the socket port instead of the 
configured advertised port.
    A broker with the following configuration:
    
    listeners=PUBLIC://0.0.0.0:19092,REPLICATION://0.0.0.0:9091
    
advertised.listeners=PUBLIC://envoy-kafka-broker:9091,REPLICATION://kafka-broker1:9091
    
    advertises on the PUBLIC listener envoy-kafka-broker:19092, however I would 
expect that
    envoy-kafka-broker:9091 is advertised. In ZooKeeper mode it works as 
expected. This PR
    changes the BrokerServer class so that in Kraft mode the configured 
advertised port is
    registered as expected.
    
    Reviewers: Jason Gustafson <[email protected]>, Ismael Juma 
<[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   2 +-
 core/src/test/java/kafka/testkit/BrokerNode.java   |  12 ++-
 core/src/test/java/kafka/testkit/TestKitNodes.java |   2 +-
 .../integration/kafka/server/RaftClusterTest.scala | 113 ++++++++++++++++++++-
 4 files changed, 124 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 16856f8..21f90b1 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -311,7 +311,7 @@ class BrokerServer(
         networkListeners.add(new Listener().
           setHost(if (Utils.isBlank(ep.host)) 
InetAddress.getLocalHost.getCanonicalHostName else ep.host).
           setName(ep.listenerName.value()).
-          setPort(socketServer.boundPort(ep.listenerName)).
+          setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) 
else ep.port).
           setSecurityProtocol(ep.securityProtocol.id))
       }
       lifecycleManager.start(() => metadataListener.highestMetadataOffset(),
diff --git a/core/src/test/java/kafka/testkit/BrokerNode.java 
b/core/src/test/java/kafka/testkit/BrokerNode.java
index 32bd51b..005d498 100644
--- a/core/src/test/java/kafka/testkit/BrokerNode.java
+++ b/core/src/test/java/kafka/testkit/BrokerNode.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static java.util.Collections.emptyMap;
+
 public class BrokerNode implements TestKitNode {
     public static class Builder {
         private int id = -1;
@@ -76,11 +78,19 @@ public class BrokerNode implements TestKitNode {
                Uuid incarnationId,
                String metadataDirectory,
                List<String> logDataDirectories) {
+        this(id, incarnationId, metadataDirectory, logDataDirectories, 
emptyMap());
+    }
+
+    BrokerNode(int id,
+               Uuid incarnationId,
+               String metadataDirectory,
+               List<String> logDataDirectories,
+               Map<String, String> propertyOverrides) {
         this.id = id;
         this.incarnationId = incarnationId;
         this.metadataDirectory = metadataDirectory;
         this.logDataDirectories = new ArrayList<>(logDataDirectories);
-        this.propertyOverrides = new HashMap<>();
+        this.propertyOverrides = new HashMap<>(propertyOverrides);
     }
 
     @Override
diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java 
b/core/src/test/java/kafka/testkit/TestKitNodes.java
index 2950887..d52b800 100644
--- a/core/src/test/java/kafka/testkit/TestKitNodes.java
+++ b/core/src/test/java/kafka/testkit/TestKitNodes.java
@@ -159,7 +159,7 @@ public class TestKitNodes {
             BrokerNode node = entry.getValue();
             newBrokerNodes.put(entry.getKey(), new BrokerNode(node.id(),
                 node.incarnationId(), absolutize(baseDirectory, 
node.metadataDirectory()),
-                absolutize(baseDirectory, node.logDataDirectories())));
+                absolutize(baseDirectory, node.logDataDirectories()), 
node.propertyOverrides()));
         }
         return new TestKitNodes(clusterId, newControllerNodes, newBrokerNodes);
     }
diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
index fce5af9..30e04f1 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
@@ -17,16 +17,22 @@
 
 package kafka.server
 
-import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.network.SocketServer
+import kafka.server.IntegrationTestUtils.connectAndReceive
+import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.{Admin, NewTopic}
+import org.apache.kafka.common.message.DescribeClusterRequestData
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
+import org.apache.kafka.common.requests.{DescribeClusterRequest, 
DescribeClusterResponse}
 import org.apache.kafka.metadata.BrokerState
-import org.junit.jupiter.api.{Test, Timeout}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{Test, Timeout}
 
 import java.util
 import java.util.Collections
+import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
 import scala.jdk.CollectionConverters._
 
 @Timeout(120)
@@ -313,4 +319,107 @@ class RaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testCreateClusterWithAdvertisedPortZero(): Unit = {
+    val brokerPropertyOverrides: (TestKitNodes, BrokerNode) => Map[String, 
String] = (nodes, _) => Map(
+      (KafkaConfig.ListenersProp, 
s"${nodes.externalListenerName.value}://localhost:0"),
+      (KafkaConfig.AdvertisedListenersProp, 
s"${nodes.externalListenerName.value}://localhost:0"))
+
+    doOnStartedKafkaCluster(numBrokerNodes = 3, brokerPropertyOverrides = 
brokerPropertyOverrides) { implicit cluster =>
+      
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.externalListenerName,
 (15L, SECONDS))
+        .nodes.values.forEach { broker =>
+          assertEquals("localhost", broker.host,
+            "Did not advertise configured advertised host")
+          
assertEquals(cluster.brokers.get(broker.id).socketServer.boundPort(cluster.nodes.externalListenerName),
 broker.port,
+            "Did not advertise bound socket port")
+        }
+    }
+  }
+
+  @Test
+  def testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer(): 
Unit = {
+    val brokerPropertyOverrides: (TestKitNodes, BrokerNode) => Map[String, 
String] = (nodes, broker) => Map(
+      (KafkaConfig.ListenersProp, 
s"${nodes.externalListenerName.value}://localhost:0"),
+      (KafkaConfig.AdvertisedListenersProp, 
s"${nodes.externalListenerName.value}://advertised-host-${broker.id}:${broker.id
 + 100}"))
+
+    doOnStartedKafkaCluster(numBrokerNodes = 3, brokerPropertyOverrides = 
brokerPropertyOverrides) { implicit cluster =>
+      
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.externalListenerName,
 (15L, SECONDS))
+        .nodes.values.forEach { broker =>
+          assertEquals(s"advertised-host-${broker.id}", broker.host, "Did not 
advertise configured advertised host")
+          assertEquals(broker.id + 100, broker.port, "Did not advertise 
configured advertised port")
+        }
+    }
+  }
+
+  private def doOnStartedKafkaCluster(numControllerNodes: Int = 1,
+                                      numBrokerNodes: Int,
+                                      brokerPropertyOverrides: (TestKitNodes, 
BrokerNode) => Map[String, String])
+                                     (action: KafkaClusterTestKit => Unit): 
Unit = {
+    val nodes = new TestKitNodes.Builder()
+      .setNumControllerNodes(numControllerNodes)
+      .setNumBrokerNodes(numBrokerNodes)
+      .build()
+    nodes.brokerNodes.values.forEach {
+      broker => broker.propertyOverrides.putAll(brokerPropertyOverrides(nodes, 
broker).asJava)
+    }
+    val cluster = new KafkaClusterTestKit.Builder(nodes).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      action(cluster)
+    } finally {
+      cluster.close()
+    }
+  }
+
+  private def 
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(listenerName: 
ListenerName,
+                                                                             
waitTime: FiniteDuration)
+                                                                            
(implicit cluster: KafkaClusterTestKit): DescribeClusterResponse = {
+    val startTime = System.currentTimeMillis
+    val runningBrokerServers = waitForRunningBrokers(1, waitTime)
+    val remainingWaitTime = waitTime - (System.currentTimeMillis - startTime, 
MILLISECONDS)
+    sendDescribeClusterRequestToBoundPortUntilBrokersPropagated(
+      runningBrokerServers.head, listenerName,
+      cluster.nodes.brokerNodes.size, remainingWaitTime)
+  }
+
+  private def waitForRunningBrokers(count: Int, waitTime: FiniteDuration)
+                                   (implicit cluster: KafkaClusterTestKit): 
Seq[BrokerServer] = {
+    def getRunningBrokerServers: Seq[BrokerServer] = 
cluster.brokers.values.asScala.toSeq
+      .filter(brokerServer => brokerServer.currentState() == 
BrokerState.RUNNING)
+
+    val (runningBrokerServers, hasRunningBrokers) = 
TestUtils.computeUntilTrue(getRunningBrokerServers, 
waitTime.toMillis)(_.nonEmpty)
+    assertTrue(hasRunningBrokers,
+      s"After ${waitTime.toMillis} ms at least $count broker(s) should be in 
RUNNING state, " +
+        s"but only ${runningBrokerServers.size} broker(s) are.")
+    runningBrokerServers
+  }
+
+  private def 
sendDescribeClusterRequestToBoundPortUntilBrokersPropagated(destination: 
BrokerServer,
+                                                                          
listenerName: ListenerName,
+                                                                          
expectedBrokerCount: Int,
+                                                                          
waitTime: FiniteDuration): DescribeClusterResponse = {
+    val (describeClusterResponse, metadataUpToDate) = 
TestUtils.computeUntilTrue(
+      compute = 
sendDescribeClusterRequestToBoundPort(destination.socketServer, listenerName),
+      waitTime = waitTime.toMillis
+    ) {
+      response => response.nodes.size == expectedBrokerCount
+    }
+
+    assertTrue(metadataUpToDate,
+      s"After ${waitTime.toMillis} ms Broker is only aware of 
${describeClusterResponse.nodes.size} brokers, " +
+        s"but $expectedBrokerCount are expected.")
+
+    describeClusterResponse
+  }
+
+  private def sendDescribeClusterRequestToBoundPort(destination: SocketServer,
+                                                    listenerName: 
ListenerName): DescribeClusterResponse =
+    connectAndReceive[DescribeClusterResponse](
+      request = new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()).build(),
+      destination = destination,
+      listenerName = listenerName
+    )
+
 }

Reply via email to