This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 747dc172e87 KIP-1073: Return fenced brokers in DescribeCluster
response (#17524)
747dc172e87 is described below
commit 747dc172e874572ca2e3f917606028de205488d6
Author: Gantigmaa Selenge <[email protected]>
AuthorDate: Fri Dec 13 02:58:11 2024 +0000
KIP-1073: Return fenced brokers in DescribeCluster response (#17524)
mplementation of KIP-1073: Return fenced brokers in DescribeCluster
response.
Add new unit and integration tests for describeCluster.
Reviewers: Luke Chen <[email protected]>
---
.../clients/admin/DescribeClusterOptions.java | 15 ++++++
.../kafka/clients/admin/KafkaAdminClient.java | 13 ++++-
.../main/java/org/apache/kafka/common/Node.java | 26 +++++++--
.../common/requests/DescribeClusterResponse.java | 2 +-
.../common/message/DescribeClusterRequest.json | 7 ++-
.../common/message/DescribeClusterResponse.json | 7 ++-
.../kafka/clients/admin/KafkaAdminClientTest.java | 17 ++++++
core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++-
.../main/scala/kafka/server/MetadataCache.scala | 2 +
.../kafka/server/metadata/KRaftMetadataCache.scala | 4 ++
.../kafka/server/metadata/ZkMetadataCache.scala | 4 ++
.../kafka/api/PlaintextAdminIntegrationTest.scala | 24 +++++++++
.../kafka/api/SslAdminIntegrationTest.scala | 23 +++++++-
.../apache/kafka/metadata/BrokerRegistration.java | 2 +-
.../kafka/metadata/BrokerRegistrationTest.java | 2 +-
.../java/org/apache/kafka/tools/ClusterTool.java | 62 +++++++++++++++++++++-
.../kafka/tools/BrokerApiVersionsCommandTest.java | 2 +-
.../org/apache/kafka/tools/ClusterToolTest.java | 50 +++++++++++++++++
18 files changed, 252 insertions(+), 18 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
index 2eac1f055f6..a6ef6f7f0f3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
@@ -29,6 +29,8 @@ public class DescribeClusterOptions extends
AbstractOptions<DescribeClusterOptio
private boolean includeAuthorizedOperations;
+ private boolean includeFencedBrokers;
+
/**
* Set the timeout in milliseconds for this operation or {@code null} if
the default api timeout for the
* AdminClient should be used.
@@ -45,6 +47,11 @@ public class DescribeClusterOptions extends
AbstractOptions<DescribeClusterOptio
return this;
}
+ public DescribeClusterOptions includeFencedBrokers(boolean
includeFencedBrokers) {
+ this.includeFencedBrokers = includeFencedBrokers;
+ return this;
+ }
+
/**
* Specify if authorized operations should be included in the response.
Note that some
* older brokers cannot not supply this information even if it is
requested.
@@ -52,4 +59,12 @@ public class DescribeClusterOptions extends
AbstractOptions<DescribeClusterOptio
public boolean includeAuthorizedOperations() {
return includeAuthorizedOperations;
}
+
+ /**
+ * Specify if fenced brokers should be included in the response. Note
that some
+ * older brokers cannot not supply this information even if it is
requested.
+ */
+ public boolean includeFencedBrokers() {
+ return includeFencedBrokers;
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index bd135acc74b..b7e482720b0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2504,10 +2504,14 @@ public class KafkaAdminClient extends AdminClient {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
if (!useMetadataRequest) {
+ if (metadataManager.usingBootstrapControllers() &&
options.includeFencedBrokers()) {
+ throw new IllegalArgumentException("Cannot request
fenced brokers from controller endpoint");
+ }
return new DescribeClusterRequest.Builder(new
DescribeClusterRequestData()
.setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations())
.setEndpointType(metadataManager.usingBootstrapControllers() ?
- EndpointType.CONTROLLER.id() :
EndpointType.BROKER.id()));
+ EndpointType.CONTROLLER.id() :
EndpointType.BROKER.id())
+
.setIncludeFencedBrokers(options.includeFencedBrokers()));
} else {
// Since this only requests node information, it's safe to
pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
@@ -2523,7 +2527,6 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) {
if (!useMetadataRequest) {
DescribeClusterResponse response =
(DescribeClusterResponse) abstractResponse;
-
Errors error = Errors.forCode(response.data().errorCode());
if (error != Errors.NONE) {
ApiError apiError = new ApiError(error,
response.data().errorMessage());
@@ -2571,6 +2574,12 @@ public class KafkaAdminClient extends AdminClient {
return false;
}
+ // If unsupportedVersion exception was caused by the option to
include fenced brokers (only supported for version 2+)
+ // then we should not fall back to the metadataRequest.
+ if (options.includeFencedBrokers()) {
+ return false;
+ }
+
useMetadataRequest = true;
return true;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java
b/clients/src/main/java/org/apache/kafka/common/Node.java
index 020d2bcaf33..e47d941e0f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/Node.java
+++ b/clients/src/main/java/org/apache/kafka/common/Node.java
@@ -30,12 +30,13 @@ public class Node {
private final String host;
private final int port;
private final String rack;
+ private final boolean isFenced;
// Cache hashCode as it is called in performance sensitive parts of the
code (e.g. RecordAccumulator.ready)
private Integer hash;
public Node(int id, String host, int port) {
- this(id, host, port, null);
+ this(id, host, port, null, false);
}
public Node(int id, String host, int port, String rack) {
@@ -44,6 +45,16 @@ public class Node {
this.host = host;
this.port = port;
this.rack = rack;
+ this.isFenced = false;
+ }
+
+ public Node(int id, String host, int port, String rack, boolean isFenced) {
+ this.id = id;
+ this.idString = Integer.toString(id);
+ this.host = host;
+ this.port = port;
+ this.rack = rack;
+ this.isFenced = isFenced;
}
public static Node noNode() {
@@ -102,6 +113,13 @@ public class Node {
return rack;
}
+ /**
+ * Whether if this node is fenced
+ */
+ public boolean isFenced() {
+ return isFenced;
+ }
+
@Override
public int hashCode() {
Integer h = this.hash;
@@ -110,6 +128,7 @@ public class Node {
result = 31 * result + id;
result = 31 * result + port;
result = 31 * result + ((rack == null) ? 0 : rack.hashCode());
+ result = 31 * result + Objects.hashCode(isFenced);
this.hash = result;
return result;
} else {
@@ -127,12 +146,13 @@ public class Node {
return id == other.id &&
port == other.port &&
Objects.equals(host, other.host) &&
- Objects.equals(rack, other.rack);
+ Objects.equals(rack, other.rack) &&
+ Objects.equals(isFenced, other.isFenced);
}
@Override
public String toString() {
- return host + ":" + port + " (id: " + idString + " rack: " + rack +
")";
+ return host + ":" + port + " (id: " + idString + " rack: " + rack + "
isFenced: " + isFenced + ")";
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
index 4964a8a8a9d..7c892874214 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
@@ -39,7 +39,7 @@ public class DescribeClusterResponse extends AbstractResponse
{
public Map<Integer, Node> nodes() {
return data.brokers().valuesList().stream()
- .map(b -> new Node(b.brokerId(), b.host(), b.port(), b.rack()))
+ .map(b -> new Node(b.brokerId(), b.host(), b.port(), b.rack(),
b.isFenced()))
.collect(Collectors.toMap(Node::id, Function.identity()));
}
diff --git
a/clients/src/main/resources/common/message/DescribeClusterRequest.json
b/clients/src/main/resources/common/message/DescribeClusterRequest.json
index 34ebe013bb1..71e00df09b2 100644
--- a/clients/src/main/resources/common/message/DescribeClusterRequest.json
+++ b/clients/src/main/resources/common/message/DescribeClusterRequest.json
@@ -20,13 +20,16 @@
"name": "DescribeClusterRequest",
//
// Version 1 adds EndpointType for KIP-919 support.
+ // Version 2 adds IncludeFencedBrokers for KIP-1073 support.
//
- "validVersions": "0-1",
+ "validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "IncludeClusterAuthorizedOperations", "type": "bool",
"versions": "0+",
"about": "Whether to include cluster authorized operations." },
{ "name": "EndpointType", "type": "int8", "versions": "1+", "default": "1",
- "about": "The endpoint type to describe. 1=brokers, 2=controllers." }
+ "about": "The endpoint type to describe. 1=brokers, 2=controllers." },
+ { "name": "IncludeFencedBrokers", "type": "bool", "versions": "2+",
+ "about": "Whether to include fenced brokers when listing brokers." }
]
}
diff --git
a/clients/src/main/resources/common/message/DescribeClusterResponse.json
b/clients/src/main/resources/common/message/DescribeClusterResponse.json
index cd30dcfe18c..a17e427c8c3 100644
--- a/clients/src/main/resources/common/message/DescribeClusterResponse.json
+++ b/clients/src/main/resources/common/message/DescribeClusterResponse.json
@@ -20,8 +20,9 @@
//
// Version 1 adds the EndpointType field, and makes MISMATCHED_ENDPOINT_TYPE
and
// UNSUPPORTED_ENDPOINT_TYPE valid top-level response error codes.
+ // Version 2 adds IsFenced field to Brokers for KIP-1073 support.
//
- "validVersions": "0-1",
+ "validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@@ -45,7 +46,9 @@
{ "name": "Port", "type": "int32", "versions": "0+",
"about": "The broker port." },
{ "name": "Rack", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
- "about": "The rack of the broker, or null if it has not been assigned
to a rack." }
+ "about": "The rack of the broker, or null if it has not been assigned
to a rack." },
+ { "name": "IsFenced", "type": "bool", "versions": "2+",
+ "about": "Whether the broker is fenced" }
]},
{ "name": "ClusterAuthorizedOperations", "type": "int32", "versions":
"0+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this
cluster." }
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 44f6e1f5a88..1d1ae3e884b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -3166,6 +3166,23 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void
testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers() {
+ ApiVersion describeClusterV1 = new ApiVersion()
+ .setApiKey(ApiKeys.DESCRIBE_CLUSTER.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 1);
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(describeClusterV1)));
+
+ env.kafkaClient().prepareUnsupportedVersionResponse(
+ request -> request instanceof DescribeClusterRequest);
+
+ final DescribeClusterResult result =
env.adminClient().describeCluster(new
DescribeClusterOptions().includeFencedBrokers(true));
+ TestUtils.assertFutureThrows(result.nodes(),
UnsupportedVersionException.class);
+ }
+ }
+
@Test
public void testListConsumerGroups() throws Exception {
try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(4, 0),
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 566b04dfd25..405caf240b8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3617,12 +3617,16 @@ class KafkaApis(val requestChannel: RequestChannel,
clusterId,
() => {
val brokers = new
DescribeClusterResponseData.DescribeClusterBrokerCollection()
-
metadataCache.getAliveBrokerNodes(request.context.listenerName).foreach { node
=>
+ val describeClusterRequest = request.body[DescribeClusterRequest]
+ metadataCache.getBrokerNodes(request.context.listenerName).foreach {
node =>
+ if (!node.isFenced ||
describeClusterRequest.data().includeFencedBrokers()) {
brokers.add(new DescribeClusterResponseData.DescribeClusterBroker().
setBrokerId(node.id).
setHost(node.host).
setPort(node.port).
- setRack(node.rack))
+ setRack(node.rack).
+ setIsFenced(node.isFenced))
+ }
}
brokers
},
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index c98431c44e9..562c9d0ce4a 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -75,6 +75,8 @@ trait MetadataCache {
def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node]
+ def getBrokerNodes(listenerName: ListenerName): Iterable[Node]
+
def getPartitionInfo(topic: String, partitionId: Int):
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
/**
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 5fad48f8a71..5d7484714d0 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -381,6 +381,10 @@ class KRaftMetadataCache(
flatMap(_.node(listenerName.value()).toScala).toSeq
}
+ override def getBrokerNodes(listenerName: ListenerName): Seq[Node] = {
+
_currentImage.cluster().brokers().values().asScala.flatMap(_.node(listenerName.value()).asScala).toSeq
+ }
+
// Does NOT include offline replica metadata
override def getPartitionInfo(topicName: String, partitionId: Int):
Option[UpdateMetadataPartitionState] = {
Option(_currentImage.topics().getTopic(topicName)).
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index 36684f7ef0f..d7f1d868466 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -353,6 +353,10 @@ class ZkMetadataCache(
metadataSnapshot.aliveBrokers.values.flatMap(_.getNode(listenerName))
}
+ override def getBrokerNodes(listenerName: ListenerName): Iterable[Node] = {
+ getAliveBrokerNodes(listenerName)
+ }
+
def getTopicId(topicName: String): Uuid = {
metadataSnapshot.topicIds.getOrElse(topicName, Uuid.ZERO_UUID)
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index bd381f0306e..cff801b2b50 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -595,6 +595,30 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testListNodesWithFencedBroker(quorum: String): Unit = {
+ client = createAdminClient
+ val fencedBrokerId = brokers.last.config.brokerId
+ killBroker(fencedBrokerId, JDuration.ofMillis(0))
+ // It takes a few seconds for a broker to get fenced after being killed
+ // So we retry until only 2 of 3 brokers returned in the result or the max
wait is reached
+ TestUtils.retry(20000) {
+
assertTrue(client.describeCluster().nodes().get().asScala.size.equals(brokers.size
- 1))
+ }
+
+ // List nodes again but this time include the fenced broker
+ val nodes = client.describeCluster(new
DescribeClusterOptions().includeFencedBrokers(true)).nodes().get().asScala
+ assertTrue(nodes.size.equals(brokers.size))
+ nodes.foreach(node => {
+ if (node.id().equals(fencedBrokerId)) {
+ assertTrue(node.isFenced)
+ } else {
+ assertFalse(node.isFenced)
+ }
+ })
+ }
+
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAdminClientHandlingBadIPWithoutTimeout(quorum: String): Unit = {
diff --git
a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
index 5063e79ad08..5fcc0449bb5 100644
--- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
@@ -18,7 +18,7 @@ import java.util.Properties
import com.yammer.metrics.core.Gauge
import kafka.security.JaasTestUtils
import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{AdminClientConfig, CreateAclsResult}
+import org.apache.kafka.clients.admin.{AdminClientConfig, CreateAclsResult,
DescribeClusterOptions}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@@ -32,7 +32,7 @@ import org.apache.kafka.common.network.ConnectionMode
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.authorizer.{ClusterMetadataAuthorizer,
StandardAuthorizer}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotNull, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotNull, assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -158,6 +158,25 @@ class SslAdminIntegrationTest extends
SaslSslAdminIntegrationTest {
super.tearDown()
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testListNodesFromControllersIncludingFencedBrokers(quorum: String): Unit
= {
+ useBoostrapControllers()
+ client = createAdminClient
+ val result = client.describeCluster(new
DescribeClusterOptions().includeFencedBrokers(true))
+ val exception = assertThrows(classOf[Exception], () => {
result.nodes().get()})
+ assertTrue(exception.getCause.getCause.getMessage.contains("Cannot request
fenced brokers from controller endpoint"))
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testListNodesFromControllers(quorum: String): Unit = {
+ useBoostrapControllers()
+ client = createAdminClient
+ val result = client.describeCluster(new DescribeClusterOptions())
+ assertTrue(result.nodes().get().size().equals(controllerServers.size))
+ }
+
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAclUpdatesUsingSynchronousAuthorizer(quorum: String): Unit = {
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 2bd77f06e11..a7409da27a4 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -233,7 +233,7 @@ public class BrokerRegistration {
if (endpoint == null) {
return Optional.empty();
}
- return Optional.of(new Node(id, endpoint.host(), endpoint.port(),
rack.orElse(null)));
+ return Optional.of(new Node(id, endpoint.host(), endpoint.port(),
rack.orElse(null), fenced));
}
public Map<String, VersionRange> supportedFeatures() {
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index 485a39aad8b..e45234d225e 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -159,7 +159,7 @@ public class BrokerRegistrationTest {
assertEquals(Optional.empty(),
REGISTRATIONS.get(0).node("NONEXISTENT"));
assertEquals(Optional.of(new Node(0, "localhost", 9090, null)),
REGISTRATIONS.get(0).node("INTERNAL"));
- assertEquals(Optional.of(new Node(1, "localhost", 9091, null)),
+ assertEquals(Optional.of(new Node(1, "localhost", 9091, null, true)),
REGISTRATIONS.get(1).node("INTERNAL"));
assertEquals(Optional.of(new Node(2, "localhost", 9092, "myrack")),
REGISTRATIONS.get(2).node("INTERNAL"));
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
index f4699d221e2..370d756d493 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
@@ -17,6 +17,8 @@
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeClusterOptions;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
@@ -31,11 +33,13 @@ import net.sourceforge.argparse4j.inf.Subparsers;
import java.io.PrintStream;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
public class ClusterTool {
@@ -68,7 +72,9 @@ public class ClusterTool {
.help("Get information about the ID of a cluster.");
Subparser unregisterParser = subparsers.addParser("unregister")
.help("Unregister a broker.");
- for (Subparser subpparser : Arrays.asList(clusterIdParser,
unregisterParser)) {
+ Subparser listEndpoints = subparsers.addParser("list-endpoints")
+ .help("List endpoints");
+ for (Subparser subpparser : Arrays.asList(clusterIdParser,
unregisterParser, listEndpoints)) {
MutuallyExclusiveGroup connectionOptions =
subpparser.addMutuallyExclusiveGroup().required(true);
connectionOptions.addArgument("--bootstrap-server", "-b")
.action(store())
@@ -85,6 +91,9 @@ public class ClusterTool {
.action(store())
.required(true)
.help("The ID of the broker to unregister.");
+ listEndpoints.addArgument("--include-fenced-brokers")
+ .action(storeTrue())
+ .help("Whether to include fenced brokers when listing broker
endpoints");
Namespace namespace = parser.parseArgsOrFail(args);
String command = namespace.getString("command");
@@ -108,6 +117,17 @@ public class ClusterTool {
}
break;
}
+ case "list-endpoints": {
+ try (Admin adminClient = Admin.create(properties)) {
+ boolean includeFencedBrokers =
Optional.of(namespace.getBoolean("include_fenced_brokers")).orElse(false);
+ boolean listControllerEndpoints =
namespace.getString("bootstrap_controller") != null;
+ if (includeFencedBrokers && listControllerEndpoints) {
+ throw new IllegalArgumentException("The option
--include-fenced-brokers is only supported with --bootstrap-server option");
+ }
+ listEndpoints(System.out, adminClient,
listControllerEndpoints, includeFencedBrokers);
+ }
+ break;
+ }
default:
throw new RuntimeException("Unknown command " + command);
}
@@ -135,4 +155,44 @@ public class ClusterTool {
}
}
}
+
+ static void listEndpoints(PrintStream stream, Admin adminClient, boolean
listControllerEndpoints, boolean includeFencedBrokers) throws Exception {
+ try {
+ DescribeClusterOptions option = new
DescribeClusterOptions().includeFencedBrokers(includeFencedBrokers);
+ Collection<Node> nodes =
adminClient.describeCluster(option).nodes().get();
+
+ String maxHostLength = String.valueOf(nodes.stream().map(node ->
node.host().length()).max(Integer::compareTo).orElse(100));
+ String maxRackLength = String.valueOf(nodes.stream().filter(node
-> node.hasRack()).map(node ->
node.rack().length()).max(Integer::compareTo).orElse(10));
+
+ if (listControllerEndpoints) {
+ String format = "%-10s %-" + maxHostLength + "s %-10s %-" +
maxRackLength + "s %-15s%n";
+ stream.printf(format, "ID", "HOST", "PORT", "RACK",
"ENDPOINT_TYPE");
+ nodes.stream().forEach(node -> stream.printf(format,
+ node.idString(),
+ node.host(),
+ node.port(),
+ node.rack(),
+ "controller"
+ ));
+ } else {
+ String format = "%-10s %-" + maxHostLength + "s %-10s %-" +
maxRackLength + "s %-10s %-15s%n";
+ stream.printf(format, "ID", "HOST", "PORT", "RACK", "STATE",
"ENDPOINT_TYPE");
+ nodes.stream().forEach(node -> stream.printf(format,
+ node.idString(),
+ node.host(),
+ node.port(),
+ node.rack(),
+ node.isFenced() ? "fenced" : "unfenced",
+ "broker"
+ ));
+ }
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ if (cause instanceof UnsupportedVersionException) {
+ stream.println(ee.getCause().getMessage());
+ } else {
+ throw ee;
+ }
+ }
+ }
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
index 74919740170..aac6a3f48ff 100644
---
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
@@ -54,7 +54,7 @@ public class BrokerApiVersionsCommandTest {
BrokerApiVersionsCommand.mainNoExit("--bootstrap-server",
clusterInstance.bootstrapServers()));
Iterator<String> lineIter =
Arrays.stream(output.split("\n")).iterator();
assertTrue(lineIter.hasNext());
- assertEquals(clusterInstance.bootstrapServers() + " (id: 0 rack: null)
-> (", lineIter.next());
+ assertEquals(clusterInstance.bootstrapServers() + " (id: 0 rack: null
isFenced: false) -> (", lineIter.next());
ApiMessageType.ListenerType listenerType =
ApiMessageType.ListenerType.BROKER;
diff --git a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
index d27839269f4..b21795411e2 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
@@ -29,8 +29,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -61,6 +64,34 @@ public class ClusterToolTest {
assertTrue(output.contains("Broker " + brokerId + " is no longer
registered."));
}
+ @ClusterTest(brokers = 1, types = {Type.KRAFT, Type.CO_KRAFT})
+ public void testListEndpointsWithBootstrapServer(ClusterInstance
clusterInstance) {
+ String output = ToolsTestUtils.captureStandardOut(() ->
+ assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints",
"--bootstrap-server", clusterInstance.bootstrapServers())));
+ String port = clusterInstance.bootstrapServers().split(":")[1];
+ int id = clusterInstance.brokerIds().iterator().next();
+ String format = "%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s
%-10s %-10s %-6s";
+ String expected = String.format(format, "ID", "HOST", "PORT", "RACK",
"STATE", "ENDPOINT_TYPE", id, "localhost", port, "null", "unfenced", "broker");
+ assertEquals(expected, output);
+ }
+
+ @ClusterTest(brokers = 2, types = {Type.KRAFT, Type.CO_KRAFT})
+ public void testListEndpointsArgumentWithBootstrapServer(ClusterInstance
clusterInstance) {
+ List<Integer> brokerIds =
clusterInstance.brokerIds().stream().collect(Collectors.toList());
+ clusterInstance.shutdownBroker(brokerIds.get(0));
+
+ List<String> ports =
Arrays.stream(clusterInstance.bootstrapServers().split(",")).map(b ->
b.split(":")[1]).collect(Collectors.toList());
+ String format = "%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s
%-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-6s";
+ String expected = String.format(format,
+ "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE",
+ brokerIds.get(0), "localhost", ports.get(0), "null", "fenced",
"broker",
+ brokerIds.get(1), "localhost", ports.get(1), "null",
"unfenced", "broker");
+
+ String output = ToolsTestUtils.captureStandardOut(() ->
assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints",
"--bootstrap-server", clusterInstance.bootstrapServers(),
"--include-fenced-brokers")));
+
+ assertEquals(expected, output);
+ }
+
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
public void testClusterIdWithBootstrapController(ClusterInstance
clusterInstance) {
String output = ToolsTestUtils.captureStandardOut(() ->
@@ -83,6 +114,25 @@ public class ClusterToolTest {
"the controller quorum.", exception.getCause().getMessage());
}
+ @ClusterTest(brokers = 3, types = {Type.KRAFT, Type.CO_KRAFT})
+ public void testListEndpointsWithBootstrapController(ClusterInstance
clusterInstance) {
+ String output = ToolsTestUtils.captureStandardOut(() ->
+ assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints",
"--bootstrap-controller", clusterInstance.bootstrapControllers())));
+ String port = clusterInstance.bootstrapControllers().split(":")[1];
+ int id = clusterInstance.controllerIds().iterator().next();
+ String format = "%-10s %-9s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s
%-10s";
+ String expected = String.format(format, "ID", "HOST", "PORT", "RACK",
"ENDPOINT_TYPE", id, "localhost", port, "null", "controller");
+ assertTrue(output.equals(expected));
+ }
+
+ @ClusterTest(brokers = 3, types = {Type.KRAFT, Type.CO_KRAFT})
+ public void
testListEndpointsArgumentWithBootstrapController(ClusterInstance
clusterInstance) {
+ RuntimeException exception =
+ assertThrows(RuntimeException.class,
+ () -> ClusterTool.execute("list-endpoints",
"--bootstrap-controller", clusterInstance.bootstrapControllers(),
"--include-fenced-brokers"));
+ assertEquals("The option --include-fenced-brokers is only supported
with --bootstrap-server option", exception.getMessage());
+ }
+
@Test
public void testPrintClusterId() throws Exception {
Admin adminClient = new MockAdminClient.Builder().