This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 73a54c5 KAFKA-13003: In kraft mode also advertise configured
advertised port instead of socket port (#10935)
73a54c5 is described below
commit 73a54c5265231b3d764391b70572626a876f1e27
Author: Uwe Eisele <[email protected]>
AuthorDate: Mon Jul 12 22:40:09 2021 +0200
KAFKA-13003: In kraft mode also advertise configured advertised port
instead of socket port (#10935)
In Kraft mode, Apache Kafka 2.8.0 advertises the socket port instead of the
configured advertised port.
A broker with the following configuration:
listeners=PUBLIC://0.0.0.0:19092,REPLICATION://0.0.0.0:9091
advertised.listeners=PUBLIC://envoy-kafka-broker:9091,REPLICATION://kafka-broker1:9091
advertises on the PUBLIC listener envoy-kafka-broker:19092, however I would
expect that
envoy-kafka-broker:9091 is advertised. In ZooKeeper mode it works as
expected. This PR
changes the BrokerServer class so that in Kraft mode the configured
advertised port is
registered as expected.
Reviewers: Jason Gustafson <[email protected]>, Ismael Juma
<[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
core/src/test/java/kafka/testkit/BrokerNode.java | 12 ++-
core/src/test/java/kafka/testkit/TestKitNodes.java | 2 +-
.../integration/kafka/server/RaftClusterTest.scala | 113 ++++++++++++++++++++-
4 files changed, 124 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 16856f8..21f90b1 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -311,7 +311,7 @@ class BrokerServer(
networkListeners.add(new Listener().
setHost(if (Utils.isBlank(ep.host))
InetAddress.getLocalHost.getCanonicalHostName else ep.host).
setName(ep.listenerName.value()).
- setPort(socketServer.boundPort(ep.listenerName)).
+ setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName)
else ep.port).
setSecurityProtocol(ep.securityProtocol.id))
}
lifecycleManager.start(() => metadataListener.highestMetadataOffset(),
diff --git a/core/src/test/java/kafka/testkit/BrokerNode.java
b/core/src/test/java/kafka/testkit/BrokerNode.java
index 32bd51b..005d498 100644
--- a/core/src/test/java/kafka/testkit/BrokerNode.java
+++ b/core/src/test/java/kafka/testkit/BrokerNode.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static java.util.Collections.emptyMap;
+
public class BrokerNode implements TestKitNode {
public static class Builder {
private int id = -1;
@@ -76,11 +78,19 @@ public class BrokerNode implements TestKitNode {
Uuid incarnationId,
String metadataDirectory,
List<String> logDataDirectories) {
+ this(id, incarnationId, metadataDirectory, logDataDirectories,
emptyMap());
+ }
+
+ BrokerNode(int id,
+ Uuid incarnationId,
+ String metadataDirectory,
+ List<String> logDataDirectories,
+ Map<String, String> propertyOverrides) {
this.id = id;
this.incarnationId = incarnationId;
this.metadataDirectory = metadataDirectory;
this.logDataDirectories = new ArrayList<>(logDataDirectories);
- this.propertyOverrides = new HashMap<>();
+ this.propertyOverrides = new HashMap<>(propertyOverrides);
}
@Override
diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java
b/core/src/test/java/kafka/testkit/TestKitNodes.java
index 2950887..d52b800 100644
--- a/core/src/test/java/kafka/testkit/TestKitNodes.java
+++ b/core/src/test/java/kafka/testkit/TestKitNodes.java
@@ -159,7 +159,7 @@ public class TestKitNodes {
BrokerNode node = entry.getValue();
newBrokerNodes.put(entry.getKey(), new BrokerNode(node.id(),
node.incarnationId(), absolutize(baseDirectory,
node.metadataDirectory()),
- absolutize(baseDirectory, node.logDataDirectories())));
+ absolutize(baseDirectory, node.logDataDirectories()),
node.propertyOverrides()));
}
return new TestKitNodes(clusterId, newControllerNodes, newBrokerNodes);
}
diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
index fce5af9..30e04f1 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
@@ -17,16 +17,22 @@
package kafka.server
-import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.network.SocketServer
+import kafka.server.IntegrationTestUtils.connectAndReceive
+import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, NewTopic}
+import org.apache.kafka.common.message.DescribeClusterRequestData
+import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
+import org.apache.kafka.common.requests.{DescribeClusterRequest,
DescribeClusterResponse}
import org.apache.kafka.metadata.BrokerState
-import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{Test, Timeout}
import java.util
import java.util.Collections
+import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
@Timeout(120)
@@ -313,4 +319,107 @@ class RaftClusterTest {
cluster.close()
}
}
+
+ @Test
+ def testCreateClusterWithAdvertisedPortZero(): Unit = {
+ val brokerPropertyOverrides: (TestKitNodes, BrokerNode) => Map[String,
String] = (nodes, _) => Map(
+ (KafkaConfig.ListenersProp,
s"${nodes.externalListenerName.value}://localhost:0"),
+ (KafkaConfig.AdvertisedListenersProp,
s"${nodes.externalListenerName.value}://localhost:0"))
+
+ doOnStartedKafkaCluster(numBrokerNodes = 3, brokerPropertyOverrides =
brokerPropertyOverrides) { implicit cluster =>
+
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.externalListenerName,
(15L, SECONDS))
+ .nodes.values.forEach { broker =>
+ assertEquals("localhost", broker.host,
+ "Did not advertise configured advertised host")
+
assertEquals(cluster.brokers.get(broker.id).socketServer.boundPort(cluster.nodes.externalListenerName),
broker.port,
+ "Did not advertise bound socket port")
+ }
+ }
+ }
+
+ @Test
+ def testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer():
Unit = {
+ val brokerPropertyOverrides: (TestKitNodes, BrokerNode) => Map[String,
String] = (nodes, broker) => Map(
+ (KafkaConfig.ListenersProp,
s"${nodes.externalListenerName.value}://localhost:0"),
+ (KafkaConfig.AdvertisedListenersProp,
s"${nodes.externalListenerName.value}://advertised-host-${broker.id}:${broker.id
+ 100}"))
+
+ doOnStartedKafkaCluster(numBrokerNodes = 3, brokerPropertyOverrides =
brokerPropertyOverrides) { implicit cluster =>
+
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.externalListenerName,
(15L, SECONDS))
+ .nodes.values.forEach { broker =>
+ assertEquals(s"advertised-host-${broker.id}", broker.host, "Did not
advertise configured advertised host")
+ assertEquals(broker.id + 100, broker.port, "Did not advertise
configured advertised port")
+ }
+ }
+ }
+
+ private def doOnStartedKafkaCluster(numControllerNodes: Int = 1,
+ numBrokerNodes: Int,
+ brokerPropertyOverrides: (TestKitNodes,
BrokerNode) => Map[String, String])
+ (action: KafkaClusterTestKit => Unit):
Unit = {
+ val nodes = new TestKitNodes.Builder()
+ .setNumControllerNodes(numControllerNodes)
+ .setNumBrokerNodes(numBrokerNodes)
+ .build()
+ nodes.brokerNodes.values.forEach {
+ broker => broker.propertyOverrides.putAll(brokerPropertyOverrides(nodes,
broker).asJava)
+ }
+ val cluster = new KafkaClusterTestKit.Builder(nodes).build()
+ try {
+ cluster.format()
+ cluster.startup()
+ action(cluster)
+ } finally {
+ cluster.close()
+ }
+ }
+
+ private def
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(listenerName:
ListenerName,
+
waitTime: FiniteDuration)
+
(implicit cluster: KafkaClusterTestKit): DescribeClusterResponse = {
+ val startTime = System.currentTimeMillis
+ val runningBrokerServers = waitForRunningBrokers(1, waitTime)
+ val remainingWaitTime = waitTime - (System.currentTimeMillis - startTime,
MILLISECONDS)
+ sendDescribeClusterRequestToBoundPortUntilBrokersPropagated(
+ runningBrokerServers.head, listenerName,
+ cluster.nodes.brokerNodes.size, remainingWaitTime)
+ }
+
+ private def waitForRunningBrokers(count: Int, waitTime: FiniteDuration)
+ (implicit cluster: KafkaClusterTestKit):
Seq[BrokerServer] = {
+ def getRunningBrokerServers: Seq[BrokerServer] =
cluster.brokers.values.asScala.toSeq
+ .filter(brokerServer => brokerServer.currentState() ==
BrokerState.RUNNING)
+
+ val (runningBrokerServers, hasRunningBrokers) =
TestUtils.computeUntilTrue(getRunningBrokerServers,
waitTime.toMillis)(_.nonEmpty)
+ assertTrue(hasRunningBrokers,
+ s"After ${waitTime.toMillis} ms at least $count broker(s) should be in
RUNNING state, " +
+ s"but only ${runningBrokerServers.size} broker(s) are.")
+ runningBrokerServers
+ }
+
+ private def
sendDescribeClusterRequestToBoundPortUntilBrokersPropagated(destination:
BrokerServer,
+
listenerName: ListenerName,
+
expectedBrokerCount: Int,
+
waitTime: FiniteDuration): DescribeClusterResponse = {
+ val (describeClusterResponse, metadataUpToDate) =
TestUtils.computeUntilTrue(
+ compute =
sendDescribeClusterRequestToBoundPort(destination.socketServer, listenerName),
+ waitTime = waitTime.toMillis
+ ) {
+ response => response.nodes.size == expectedBrokerCount
+ }
+
+ assertTrue(metadataUpToDate,
+ s"After ${waitTime.toMillis} ms Broker is only aware of
${describeClusterResponse.nodes.size} brokers, " +
+ s"but $expectedBrokerCount are expected.")
+
+ describeClusterResponse
+ }
+
+ private def sendDescribeClusterRequestToBoundPort(destination: SocketServer,
+ listenerName:
ListenerName): DescribeClusterResponse =
+ connectAndReceive[DescribeClusterResponse](
+ request = new DescribeClusterRequest.Builder(new
DescribeClusterRequestData()).build(),
+ destination = destination,
+ listenerName = listenerName
+ )
+
}