This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 747dc172e87 KIP-1073: Return fenced brokers in DescribeCluster 
response (#17524)
747dc172e87 is described below

commit 747dc172e874572ca2e3f917606028de205488d6
Author: Gantigmaa Selenge <39860586+tinasele...@users.noreply.github.com>
AuthorDate: Fri Dec 13 02:58:11 2024 +0000

    KIP-1073: Return fenced brokers in DescribeCluster response (#17524)
    
    mplementation of KIP-1073: Return fenced brokers in DescribeCluster 
response.
    Add new unit and integration tests for describeCluster.
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 .../clients/admin/DescribeClusterOptions.java      | 15 ++++++
 .../kafka/clients/admin/KafkaAdminClient.java      | 13 ++++-
 .../main/java/org/apache/kafka/common/Node.java    | 26 +++++++--
 .../common/requests/DescribeClusterResponse.java   |  2 +-
 .../common/message/DescribeClusterRequest.json     |  7 ++-
 .../common/message/DescribeClusterResponse.json    |  7 ++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 17 ++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   |  8 ++-
 .../main/scala/kafka/server/MetadataCache.scala    |  2 +
 .../kafka/server/metadata/KRaftMetadataCache.scala |  4 ++
 .../kafka/server/metadata/ZkMetadataCache.scala    |  4 ++
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 24 +++++++++
 .../kafka/api/SslAdminIntegrationTest.scala        | 23 +++++++-
 .../apache/kafka/metadata/BrokerRegistration.java  |  2 +-
 .../kafka/metadata/BrokerRegistrationTest.java     |  2 +-
 .../java/org/apache/kafka/tools/ClusterTool.java   | 62 +++++++++++++++++++++-
 .../kafka/tools/BrokerApiVersionsCommandTest.java  |  2 +-
 .../org/apache/kafka/tools/ClusterToolTest.java    | 50 +++++++++++++++++
 18 files changed, 252 insertions(+), 18 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
index 2eac1f055f6..a6ef6f7f0f3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
@@ -29,6 +29,8 @@ public class DescribeClusterOptions extends 
AbstractOptions<DescribeClusterOptio
 
     private boolean includeAuthorizedOperations;
 
+    private boolean includeFencedBrokers;
+
     /**
      * Set the timeout in milliseconds for this operation or {@code null} if 
the default api timeout for the
      * AdminClient should be used.
@@ -45,6 +47,11 @@ public class DescribeClusterOptions extends 
AbstractOptions<DescribeClusterOptio
         return this;
     }
 
+    public DescribeClusterOptions includeFencedBrokers(boolean 
includeFencedBrokers) {
+        this.includeFencedBrokers = includeFencedBrokers;
+        return this;
+    }
+
     /**
      * Specify if authorized operations should be included in the response.  
Note that some
      * older brokers cannot not supply this information even if it is 
requested.
@@ -52,4 +59,12 @@ public class DescribeClusterOptions extends 
AbstractOptions<DescribeClusterOptio
     public boolean includeAuthorizedOperations() {
         return includeAuthorizedOperations;
     }
+
+    /**
+     * Specify if fenced brokers should be included in the response.  Note 
that some
+     * older brokers cannot not supply this information even if it is 
requested.
+     */
+    public boolean includeFencedBrokers() {
+        return includeFencedBrokers;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index bd135acc74b..b7e482720b0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2504,10 +2504,14 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
                 if (!useMetadataRequest) {
+                    if (metadataManager.usingBootstrapControllers() && 
options.includeFencedBrokers()) {
+                        throw new IllegalArgumentException("Cannot request 
fenced brokers from controller endpoint");
+                    }
                     return new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()
                         
.setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations())
                         
.setEndpointType(metadataManager.usingBootstrapControllers() ?
-                                EndpointType.CONTROLLER.id() : 
EndpointType.BROKER.id()));
+                                EndpointType.CONTROLLER.id() : 
EndpointType.BROKER.id())
+                        
.setIncludeFencedBrokers(options.includeFencedBrokers()));
                 } else {
                     // Since this only requests node information, it's safe to 
pass true for allowAutoTopicCreation (and it
                     // simplifies communication with older brokers)
@@ -2523,7 +2527,6 @@ public class KafkaAdminClient extends AdminClient {
             void handleResponse(AbstractResponse abstractResponse) {
                 if (!useMetadataRequest) {
                     DescribeClusterResponse response = 
(DescribeClusterResponse) abstractResponse;
-
                     Errors error = Errors.forCode(response.data().errorCode());
                     if (error != Errors.NONE) {
                         ApiError apiError = new ApiError(error, 
response.data().errorMessage());
@@ -2571,6 +2574,12 @@ public class KafkaAdminClient extends AdminClient {
                     return false;
                 }
 
+                // If unsupportedVersion exception was caused by the option to 
include fenced brokers (only supported for version 2+)
+                // then we should not fall back to the metadataRequest.
+                if (options.includeFencedBrokers()) {
+                    return false;
+                }
+
                 useMetadataRequest = true;
                 return true;
             }
diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java 
b/clients/src/main/java/org/apache/kafka/common/Node.java
index 020d2bcaf33..e47d941e0f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/Node.java
+++ b/clients/src/main/java/org/apache/kafka/common/Node.java
@@ -30,12 +30,13 @@ public class Node {
     private final String host;
     private final int port;
     private final String rack;
+    private final boolean isFenced;
 
     // Cache hashCode as it is called in performance sensitive parts of the 
code (e.g. RecordAccumulator.ready)
     private Integer hash;
 
     public Node(int id, String host, int port) {
-        this(id, host, port, null);
+        this(id, host, port, null, false);
     }
 
     public Node(int id, String host, int port, String rack) {
@@ -44,6 +45,16 @@ public class Node {
         this.host = host;
         this.port = port;
         this.rack = rack;
+        this.isFenced = false;
+    }
+
+    public Node(int id, String host, int port, String rack, boolean isFenced) {
+        this.id = id;
+        this.idString = Integer.toString(id);
+        this.host = host;
+        this.port = port;
+        this.rack = rack;
+        this.isFenced = isFenced;
     }
 
     public static Node noNode() {
@@ -102,6 +113,13 @@ public class Node {
         return rack;
     }
 
+    /**
+     * Whether if this node is fenced
+     */
+    public boolean isFenced() {
+        return isFenced;
+    }
+
     @Override
     public int hashCode() {
         Integer h = this.hash;
@@ -110,6 +128,7 @@ public class Node {
             result = 31 * result + id;
             result = 31 * result + port;
             result = 31 * result + ((rack == null) ? 0 : rack.hashCode());
+            result = 31 * result + Objects.hashCode(isFenced);
             this.hash = result;
             return result;
         } else {
@@ -127,12 +146,13 @@ public class Node {
         return id == other.id &&
             port == other.port &&
             Objects.equals(host, other.host) &&
-            Objects.equals(rack, other.rack);
+            Objects.equals(rack, other.rack) &&
+            Objects.equals(isFenced, other.isFenced);
     }
 
     @Override
     public String toString() {
-        return host + ":" + port + " (id: " + idString + " rack: " + rack + 
")";
+        return host + ":" + port + " (id: " + idString + " rack: " + rack + " 
isFenced: " + isFenced + ")";
     }
 
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
index 4964a8a8a9d..7c892874214 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
@@ -39,7 +39,7 @@ public class DescribeClusterResponse extends AbstractResponse 
{
 
     public Map<Integer, Node> nodes() {
         return data.brokers().valuesList().stream()
-            .map(b -> new Node(b.brokerId(), b.host(), b.port(), b.rack()))
+            .map(b -> new Node(b.brokerId(), b.host(), b.port(), b.rack(), 
b.isFenced()))
             .collect(Collectors.toMap(Node::id, Function.identity()));
     }
 
diff --git 
a/clients/src/main/resources/common/message/DescribeClusterRequest.json 
b/clients/src/main/resources/common/message/DescribeClusterRequest.json
index 34ebe013bb1..71e00df09b2 100644
--- a/clients/src/main/resources/common/message/DescribeClusterRequest.json
+++ b/clients/src/main/resources/common/message/DescribeClusterRequest.json
@@ -20,13 +20,16 @@
   "name": "DescribeClusterRequest",
   //
   // Version 1 adds EndpointType for KIP-919 support.
+  // Version 2 adds IncludeFencedBrokers for KIP-1073 support.
   //
-  "validVersions": "0-1",
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "IncludeClusterAuthorizedOperations", "type": "bool", 
"versions": "0+",
       "about": "Whether to include cluster authorized operations." },
     { "name": "EndpointType", "type": "int8", "versions": "1+", "default": "1",
-      "about": "The endpoint type to describe. 1=brokers, 2=controllers." }
+      "about": "The endpoint type to describe. 1=brokers, 2=controllers." },
+    { "name": "IncludeFencedBrokers", "type": "bool", "versions": "2+",
+      "about": "Whether to include fenced brokers when listing brokers." }
   ]
 }
diff --git 
a/clients/src/main/resources/common/message/DescribeClusterResponse.json 
b/clients/src/main/resources/common/message/DescribeClusterResponse.json
index cd30dcfe18c..a17e427c8c3 100644
--- a/clients/src/main/resources/common/message/DescribeClusterResponse.json
+++ b/clients/src/main/resources/common/message/DescribeClusterResponse.json
@@ -20,8 +20,9 @@
   //
   // Version 1 adds the EndpointType field, and makes MISMATCHED_ENDPOINT_TYPE 
and
   // UNSUPPORTED_ENDPOINT_TYPE valid top-level response error codes.
+  // Version 2 adds IsFenced field to Brokers for KIP-1073 support.
   //
-  "validVersions": "0-1",
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@@ -45,7 +46,9 @@
       { "name": "Port", "type": "int32", "versions": "0+",
         "about": "The broker port." },
       { "name": "Rack", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-        "about": "The rack of the broker, or null if it has not been assigned 
to a rack." }
+        "about": "The rack of the broker, or null if it has not been assigned 
to a rack." },
+      { "name": "IsFenced", "type": "bool", "versions": "2+",
+        "about": "Whether the broker is fenced" }
     ]},
     { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": 
"0+", "default": "-2147483648",
       "about": "32-bit bitfield to represent authorized operations for this 
cluster." }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 44f6e1f5a88..1d1ae3e884b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -3166,6 +3166,23 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void 
testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers() {
+        ApiVersion describeClusterV1 = new ApiVersion()
+            .setApiKey(ApiKeys.DESCRIBE_CLUSTER.id)
+            .setMinVersion((short) 0)
+            .setMaxVersion((short) 1);
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(describeClusterV1)));
+
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                    request -> request instanceof DescribeClusterRequest);
+
+            final DescribeClusterResult result = 
env.adminClient().describeCluster(new 
DescribeClusterOptions().includeFencedBrokers(true));
+            TestUtils.assertFutureThrows(result.nodes(), 
UnsupportedVersionException.class);
+        }
+    }
+
     @Test
     public void testListConsumerGroups() throws Exception {
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(4, 0),
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 566b04dfd25..405caf240b8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3617,12 +3617,16 @@ class KafkaApis(val requestChannel: RequestChannel,
       clusterId,
       () => {
         val brokers = new 
DescribeClusterResponseData.DescribeClusterBrokerCollection()
-        
metadataCache.getAliveBrokerNodes(request.context.listenerName).foreach { node 
=>
+        val describeClusterRequest = request.body[DescribeClusterRequest]
+        metadataCache.getBrokerNodes(request.context.listenerName).foreach { 
node =>
+          if (!node.isFenced || 
describeClusterRequest.data().includeFencedBrokers()) {
           brokers.add(new DescribeClusterResponseData.DescribeClusterBroker().
             setBrokerId(node.id).
             setHost(node.host).
             setPort(node.port).
-            setRack(node.rack))
+            setRack(node.rack).
+            setIsFenced(node.isFenced))
+          }
         }
         brokers
       },
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index c98431c44e9..562c9d0ce4a 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -75,6 +75,8 @@ trait MetadataCache {
 
   def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node]
 
+  def getBrokerNodes(listenerName: ListenerName): Iterable[Node]
+
   def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
 
   /**
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 5fad48f8a71..5d7484714d0 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -381,6 +381,10 @@ class KRaftMetadataCache(
       flatMap(_.node(listenerName.value()).toScala).toSeq
   }
 
+  override def getBrokerNodes(listenerName: ListenerName): Seq[Node] = {
+    
_currentImage.cluster().brokers().values().asScala.flatMap(_.node(listenerName.value()).asScala).toSeq
+  }
+
   // Does NOT include offline replica metadata
   override def getPartitionInfo(topicName: String, partitionId: Int): 
Option[UpdateMetadataPartitionState] = {
     Option(_currentImage.topics().getTopic(topicName)).
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index 36684f7ef0f..d7f1d868466 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -353,6 +353,10 @@ class ZkMetadataCache(
     metadataSnapshot.aliveBrokers.values.flatMap(_.getNode(listenerName))
   }
 
+  override def getBrokerNodes(listenerName: ListenerName): Iterable[Node] = {
+    getAliveBrokerNodes(listenerName)
+  }
+
   def getTopicId(topicName: String): Uuid = {
     metadataSnapshot.topicIds.getOrElse(topicName, Uuid.ZERO_UUID)
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index bd381f0306e..cff801b2b50 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -595,6 +595,30 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testListNodesWithFencedBroker(quorum: String): Unit = {
+    client = createAdminClient
+    val fencedBrokerId = brokers.last.config.brokerId
+    killBroker(fencedBrokerId, JDuration.ofMillis(0))
+    // It takes a few seconds for a broker to get fenced after being killed
+    // So we retry until only 2 of 3 brokers returned in the result or the max 
wait is reached
+    TestUtils.retry(20000) {
+      
assertTrue(client.describeCluster().nodes().get().asScala.size.equals(brokers.size
 - 1))
+    }
+
+    // List nodes again but this time include the fenced broker
+    val nodes = client.describeCluster(new 
DescribeClusterOptions().includeFencedBrokers(true)).nodes().get().asScala
+    assertTrue(nodes.size.equals(brokers.size))
+    nodes.foreach(node => {
+      if (node.id().equals(fencedBrokerId)) {
+        assertTrue(node.isFenced)
+      } else {
+        assertFalse(node.isFenced)
+      }
+    })
+  }
+
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testAdminClientHandlingBadIPWithoutTimeout(quorum: String): Unit = {
diff --git 
a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
index 5063e79ad08..5fcc0449bb5 100644
--- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
@@ -18,7 +18,7 @@ import java.util.Properties
 import com.yammer.metrics.core.Gauge
 import kafka.security.JaasTestUtils
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{AdminClientConfig, CreateAclsResult}
+import org.apache.kafka.clients.admin.{AdminClientConfig, CreateAclsResult, 
DescribeClusterOptions}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.SslConfigs
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@@ -32,7 +32,7 @@ import org.apache.kafka.common.network.ConnectionMode
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.metadata.authorizer.{ClusterMetadataAuthorizer, 
StandardAuthorizer}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertThrows, assertTrue}
 import org.junit.jupiter.api.{AfterEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -158,6 +158,25 @@ class SslAdminIntegrationTest extends 
SaslSslAdminIntegrationTest {
     super.tearDown()
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testListNodesFromControllersIncludingFencedBrokers(quorum: String): Unit 
= {
+    useBoostrapControllers()
+    client = createAdminClient
+    val result = client.describeCluster(new 
DescribeClusterOptions().includeFencedBrokers(true))
+    val exception = assertThrows(classOf[Exception], () => { 
result.nodes().get()})
+    assertTrue(exception.getCause.getCause.getMessage.contains("Cannot request 
fenced brokers from controller endpoint"))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testListNodesFromControllers(quorum: String): Unit = {
+    useBoostrapControllers()
+    client = createAdminClient
+    val result = client.describeCluster(new DescribeClusterOptions())
+    assertTrue(result.nodes().get().size().equals(controllerServers.size))
+  }
+
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testAclUpdatesUsingSynchronousAuthorizer(quorum: String): Unit = {
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 2bd77f06e11..a7409da27a4 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -233,7 +233,7 @@ public class BrokerRegistration {
         if (endpoint == null) {
             return Optional.empty();
         }
-        return Optional.of(new Node(id, endpoint.host(), endpoint.port(), 
rack.orElse(null)));
+        return Optional.of(new Node(id, endpoint.host(), endpoint.port(), 
rack.orElse(null), fenced));
     }
 
     public Map<String, VersionRange> supportedFeatures() {
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index 485a39aad8b..e45234d225e 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -159,7 +159,7 @@ public class BrokerRegistrationTest {
         assertEquals(Optional.empty(), 
REGISTRATIONS.get(0).node("NONEXISTENT"));
         assertEquals(Optional.of(new Node(0, "localhost", 9090, null)),
             REGISTRATIONS.get(0).node("INTERNAL"));
-        assertEquals(Optional.of(new Node(1, "localhost", 9091, null)),
+        assertEquals(Optional.of(new Node(1, "localhost", 9091, null, true)),
             REGISTRATIONS.get(1).node("INTERNAL"));
         assertEquals(Optional.of(new Node(2, "localhost", 9092, "myrack")),
             REGISTRATIONS.get(2).node("INTERNAL"));
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java 
b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
index f4699d221e2..370d756d493 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.tools;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeClusterOptions;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
@@ -31,11 +33,13 @@ import net.sourceforge.argparse4j.inf.Subparsers;
 
 import java.io.PrintStream;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
 
 public class ClusterTool {
 
@@ -68,7 +72,9 @@ public class ClusterTool {
                 .help("Get information about the ID of a cluster.");
         Subparser unregisterParser = subparsers.addParser("unregister")
                 .help("Unregister a broker.");
-        for (Subparser subpparser : Arrays.asList(clusterIdParser, 
unregisterParser)) {
+        Subparser listEndpoints = subparsers.addParser("list-endpoints")
+                .help("List endpoints");
+        for (Subparser subpparser : Arrays.asList(clusterIdParser, 
unregisterParser, listEndpoints)) {
             MutuallyExclusiveGroup connectionOptions = 
subpparser.addMutuallyExclusiveGroup().required(true);
             connectionOptions.addArgument("--bootstrap-server", "-b")
                     .action(store())
@@ -85,6 +91,9 @@ public class ClusterTool {
                 .action(store())
                 .required(true)
                 .help("The ID of the broker to unregister.");
+        listEndpoints.addArgument("--include-fenced-brokers")
+                .action(storeTrue())
+                .help("Whether to include fenced brokers when listing broker 
endpoints");
 
         Namespace namespace = parser.parseArgsOrFail(args);
         String command = namespace.getString("command");
@@ -108,6 +117,17 @@ public class ClusterTool {
                 }
                 break;
             }
+            case "list-endpoints": {
+                try (Admin adminClient = Admin.create(properties)) {
+                    boolean includeFencedBrokers = 
Optional.of(namespace.getBoolean("include_fenced_brokers")).orElse(false);
+                    boolean listControllerEndpoints = 
namespace.getString("bootstrap_controller") != null;
+                    if (includeFencedBrokers && listControllerEndpoints) {
+                        throw new IllegalArgumentException("The option 
--include-fenced-brokers is only supported with --bootstrap-server option");
+                    }
+                    listEndpoints(System.out, adminClient, 
listControllerEndpoints, includeFencedBrokers);
+                }
+                break;
+            }
             default:
                 throw new RuntimeException("Unknown command " + command);
         }
@@ -135,4 +155,44 @@ public class ClusterTool {
             }
         }
     }
+
+    static void listEndpoints(PrintStream stream, Admin adminClient, boolean 
listControllerEndpoints, boolean includeFencedBrokers) throws Exception {
+        try {
+            DescribeClusterOptions option = new 
DescribeClusterOptions().includeFencedBrokers(includeFencedBrokers);
+            Collection<Node> nodes = 
adminClient.describeCluster(option).nodes().get();
+
+            String maxHostLength = String.valueOf(nodes.stream().map(node -> 
node.host().length()).max(Integer::compareTo).orElse(100));
+            String maxRackLength = String.valueOf(nodes.stream().filter(node 
-> node.hasRack()).map(node -> 
node.rack().length()).max(Integer::compareTo).orElse(10));
+
+            if (listControllerEndpoints) {
+                String format = "%-10s %-" + maxHostLength + "s %-10s %-" + 
maxRackLength + "s %-15s%n";
+                stream.printf(format, "ID", "HOST", "PORT", "RACK", 
"ENDPOINT_TYPE");
+                nodes.stream().forEach(node -> stream.printf(format,
+                        node.idString(),
+                        node.host(),
+                        node.port(),
+                        node.rack(),
+                        "controller"
+                ));
+            } else {
+                String format = "%-10s %-" + maxHostLength + "s %-10s %-" + 
maxRackLength + "s %-10s %-15s%n";
+                stream.printf(format, "ID", "HOST", "PORT", "RACK", "STATE", 
"ENDPOINT_TYPE");
+                nodes.stream().forEach(node -> stream.printf(format,
+                        node.idString(),
+                        node.host(),
+                        node.port(),
+                        node.rack(),
+                        node.isFenced() ? "fenced" : "unfenced",
+                        "broker"
+                ));
+            }
+        } catch (ExecutionException ee) {
+            Throwable cause = ee.getCause();
+            if (cause instanceof UnsupportedVersionException) {
+                stream.println(ee.getCause().getMessage());
+            } else {
+                throw ee;
+            }
+        }
+    }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
index 74919740170..aac6a3f48ff 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
@@ -54,7 +54,7 @@ public class BrokerApiVersionsCommandTest {
                 BrokerApiVersionsCommand.mainNoExit("--bootstrap-server", 
clusterInstance.bootstrapServers()));
         Iterator<String> lineIter = 
Arrays.stream(output.split("\n")).iterator();
         assertTrue(lineIter.hasNext());
-        assertEquals(clusterInstance.bootstrapServers() + " (id: 0 rack: null) 
-> (", lineIter.next());
+        assertEquals(clusterInstance.bootstrapServers() + " (id: 0 rack: null 
isFenced: false) -> (", lineIter.next());
 
         ApiMessageType.ListenerType listenerType = 
ApiMessageType.ListenerType.BROKER;
 
diff --git a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
index d27839269f4..b21795411e2 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
@@ -29,8 +29,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -61,6 +64,34 @@ public class ClusterToolTest {
         assertTrue(output.contains("Broker " + brokerId + " is no longer 
registered."));
     }
 
+    @ClusterTest(brokers = 1, types = {Type.KRAFT, Type.CO_KRAFT})
+    public void testListEndpointsWithBootstrapServer(ClusterInstance 
clusterInstance) {
+        String output = ToolsTestUtils.captureStandardOut(() ->
+                assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints", 
"--bootstrap-server", clusterInstance.bootstrapServers())));
+        String port = clusterInstance.bootstrapServers().split(":")[1];
+        int id = clusterInstance.brokerIds().iterator().next();
+        String format = "%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s 
%-10s %-10s %-6s";
+        String expected = String.format(format, "ID", "HOST", "PORT", "RACK", 
"STATE", "ENDPOINT_TYPE", id, "localhost", port, "null", "unfenced", "broker");
+        assertEquals(expected, output);
+    }
+
+    @ClusterTest(brokers = 2, types = {Type.KRAFT, Type.CO_KRAFT})
+    public void testListEndpointsArgumentWithBootstrapServer(ClusterInstance 
clusterInstance) {
+        List<Integer> brokerIds = 
clusterInstance.brokerIds().stream().collect(Collectors.toList());
+        clusterInstance.shutdownBroker(brokerIds.get(0));
+
+        List<String> ports = 
Arrays.stream(clusterInstance.bootstrapServers().split(",")).map(b ->  
b.split(":")[1]).collect(Collectors.toList());
+        String format = "%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s 
%-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-6s";
+        String expected = String.format(format,
+                "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE",
+                brokerIds.get(0), "localhost", ports.get(0), "null", "fenced", 
"broker",
+                brokerIds.get(1), "localhost", ports.get(1), "null", 
"unfenced", "broker");
+
+        String output = ToolsTestUtils.captureStandardOut(() -> 
assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints", 
"--bootstrap-server", clusterInstance.bootstrapServers(), 
"--include-fenced-brokers")));
+
+        assertEquals(expected, output);
+    }
+
     @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
     public void testClusterIdWithBootstrapController(ClusterInstance 
clusterInstance) {
         String output = ToolsTestUtils.captureStandardOut(() ->
@@ -83,6 +114,25 @@ public class ClusterToolTest {
                 "the controller quorum.", exception.getCause().getMessage());
     }
 
+    @ClusterTest(brokers = 3, types = {Type.KRAFT, Type.CO_KRAFT})
+    public void testListEndpointsWithBootstrapController(ClusterInstance 
clusterInstance) {
+        String output = ToolsTestUtils.captureStandardOut(() ->
+                assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints", 
"--bootstrap-controller", clusterInstance.bootstrapControllers())));
+        String port = clusterInstance.bootstrapControllers().split(":")[1];
+        int id = clusterInstance.controllerIds().iterator().next();
+        String format = "%-10s %-9s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s 
%-10s";
+        String expected = String.format(format, "ID", "HOST", "PORT", "RACK", 
"ENDPOINT_TYPE", id, "localhost", port, "null", "controller");
+        assertTrue(output.equals(expected));
+    }
+
+    @ClusterTest(brokers = 3, types = {Type.KRAFT, Type.CO_KRAFT})
+    public void 
testListEndpointsArgumentWithBootstrapController(ClusterInstance 
clusterInstance) {
+        RuntimeException exception =
+                assertThrows(RuntimeException.class,
+                        () -> ClusterTool.execute("list-endpoints", 
"--bootstrap-controller", clusterInstance.bootstrapControllers(), 
"--include-fenced-brokers"));
+        assertEquals("The option --include-fenced-brokers is only supported 
with --bootstrap-server option", exception.getMessage());
+    }
+
     @Test
     public void testPrintClusterId() throws Exception {
         Admin adminClient = new MockAdminClient.Builder().

Reply via email to