This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 747dc172e87 KIP-1073: Return fenced brokers in DescribeCluster response (#17524) 747dc172e87 is described below commit 747dc172e874572ca2e3f917606028de205488d6 Author: Gantigmaa Selenge <39860586+tinasele...@users.noreply.github.com> AuthorDate: Fri Dec 13 02:58:11 2024 +0000 KIP-1073: Return fenced brokers in DescribeCluster response (#17524) mplementation of KIP-1073: Return fenced brokers in DescribeCluster response. Add new unit and integration tests for describeCluster. Reviewers: Luke Chen <show...@gmail.com> --- .../clients/admin/DescribeClusterOptions.java | 15 ++++++ .../kafka/clients/admin/KafkaAdminClient.java | 13 ++++- .../main/java/org/apache/kafka/common/Node.java | 26 +++++++-- .../common/requests/DescribeClusterResponse.java | 2 +- .../common/message/DescribeClusterRequest.json | 7 ++- .../common/message/DescribeClusterResponse.json | 7 ++- .../kafka/clients/admin/KafkaAdminClientTest.java | 17 ++++++ core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++- .../main/scala/kafka/server/MetadataCache.scala | 2 + .../kafka/server/metadata/KRaftMetadataCache.scala | 4 ++ .../kafka/server/metadata/ZkMetadataCache.scala | 4 ++ .../kafka/api/PlaintextAdminIntegrationTest.scala | 24 +++++++++ .../kafka/api/SslAdminIntegrationTest.scala | 23 +++++++- .../apache/kafka/metadata/BrokerRegistration.java | 2 +- .../kafka/metadata/BrokerRegistrationTest.java | 2 +- .../java/org/apache/kafka/tools/ClusterTool.java | 62 +++++++++++++++++++++- .../kafka/tools/BrokerApiVersionsCommandTest.java | 2 +- .../org/apache/kafka/tools/ClusterToolTest.java | 50 +++++++++++++++++ 18 files changed, 252 insertions(+), 18 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java index 2eac1f055f6..a6ef6f7f0f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java @@ -29,6 +29,8 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio private boolean includeAuthorizedOperations; + private boolean includeFencedBrokers; + /** * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. @@ -45,6 +47,11 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio return this; } + public DescribeClusterOptions includeFencedBrokers(boolean includeFencedBrokers) { + this.includeFencedBrokers = includeFencedBrokers; + return this; + } + /** * Specify if authorized operations should be included in the response. Note that some * older brokers cannot not supply this information even if it is requested. @@ -52,4 +59,12 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio public boolean includeAuthorizedOperations() { return includeAuthorizedOperations; } + + /** + * Specify if fenced brokers should be included in the response. Note that some + * older brokers cannot not supply this information even if it is requested. + */ + public boolean includeFencedBrokers() { + return includeFencedBrokers; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index bd135acc74b..b7e482720b0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2504,10 +2504,14 @@ public class KafkaAdminClient extends AdminClient { @Override AbstractRequest.Builder createRequest(int timeoutMs) { if (!useMetadataRequest) { + if (metadataManager.usingBootstrapControllers() && options.includeFencedBrokers()) { + throw new IllegalArgumentException("Cannot request fenced brokers from controller endpoint"); + } return new DescribeClusterRequest.Builder(new DescribeClusterRequestData() .setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations()) .setEndpointType(metadataManager.usingBootstrapControllers() ? - EndpointType.CONTROLLER.id() : EndpointType.BROKER.id())); + EndpointType.CONTROLLER.id() : EndpointType.BROKER.id()) + .setIncludeFencedBrokers(options.includeFencedBrokers())); } else { // Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it // simplifies communication with older brokers) @@ -2523,7 +2527,6 @@ public class KafkaAdminClient extends AdminClient { void handleResponse(AbstractResponse abstractResponse) { if (!useMetadataRequest) { DescribeClusterResponse response = (DescribeClusterResponse) abstractResponse; - Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { ApiError apiError = new ApiError(error, response.data().errorMessage()); @@ -2571,6 +2574,12 @@ public class KafkaAdminClient extends AdminClient { return false; } + // If unsupportedVersion exception was caused by the option to include fenced brokers (only supported for version 2+) + // then we should not fall back to the metadataRequest. + if (options.includeFencedBrokers()) { + return false; + } + useMetadataRequest = true; return true; } diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index 020d2bcaf33..e47d941e0f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -30,12 +30,13 @@ public class Node { private final String host; private final int port; private final String rack; + private final boolean isFenced; // Cache hashCode as it is called in performance sensitive parts of the code (e.g. RecordAccumulator.ready) private Integer hash; public Node(int id, String host, int port) { - this(id, host, port, null); + this(id, host, port, null, false); } public Node(int id, String host, int port, String rack) { @@ -44,6 +45,16 @@ public class Node { this.host = host; this.port = port; this.rack = rack; + this.isFenced = false; + } + + public Node(int id, String host, int port, String rack, boolean isFenced) { + this.id = id; + this.idString = Integer.toString(id); + this.host = host; + this.port = port; + this.rack = rack; + this.isFenced = isFenced; } public static Node noNode() { @@ -102,6 +113,13 @@ public class Node { return rack; } + /** + * Whether if this node is fenced + */ + public boolean isFenced() { + return isFenced; + } + @Override public int hashCode() { Integer h = this.hash; @@ -110,6 +128,7 @@ public class Node { result = 31 * result + id; result = 31 * result + port; result = 31 * result + ((rack == null) ? 0 : rack.hashCode()); + result = 31 * result + Objects.hashCode(isFenced); this.hash = result; return result; } else { @@ -127,12 +146,13 @@ public class Node { return id == other.id && port == other.port && Objects.equals(host, other.host) && - Objects.equals(rack, other.rack); + Objects.equals(rack, other.rack) && + Objects.equals(isFenced, other.isFenced); } @Override public String toString() { - return host + ":" + port + " (id: " + idString + " rack: " + rack + ")"; + return host + ":" + port + " (id: " + idString + " rack: " + rack + " isFenced: " + isFenced + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java index 4964a8a8a9d..7c892874214 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java @@ -39,7 +39,7 @@ public class DescribeClusterResponse extends AbstractResponse { public Map<Integer, Node> nodes() { return data.brokers().valuesList().stream() - .map(b -> new Node(b.brokerId(), b.host(), b.port(), b.rack())) + .map(b -> new Node(b.brokerId(), b.host(), b.port(), b.rack(), b.isFenced())) .collect(Collectors.toMap(Node::id, Function.identity())); } diff --git a/clients/src/main/resources/common/message/DescribeClusterRequest.json b/clients/src/main/resources/common/message/DescribeClusterRequest.json index 34ebe013bb1..71e00df09b2 100644 --- a/clients/src/main/resources/common/message/DescribeClusterRequest.json +++ b/clients/src/main/resources/common/message/DescribeClusterRequest.json @@ -20,13 +20,16 @@ "name": "DescribeClusterRequest", // // Version 1 adds EndpointType for KIP-919 support. + // Version 2 adds IncludeFencedBrokers for KIP-1073 support. // - "validVersions": "0-1", + "validVersions": "0-2", "flexibleVersions": "0+", "fields": [ { "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "0+", "about": "Whether to include cluster authorized operations." }, { "name": "EndpointType", "type": "int8", "versions": "1+", "default": "1", - "about": "The endpoint type to describe. 1=brokers, 2=controllers." } + "about": "The endpoint type to describe. 1=brokers, 2=controllers." }, + { "name": "IncludeFencedBrokers", "type": "bool", "versions": "2+", + "about": "Whether to include fenced brokers when listing brokers." } ] } diff --git a/clients/src/main/resources/common/message/DescribeClusterResponse.json b/clients/src/main/resources/common/message/DescribeClusterResponse.json index cd30dcfe18c..a17e427c8c3 100644 --- a/clients/src/main/resources/common/message/DescribeClusterResponse.json +++ b/clients/src/main/resources/common/message/DescribeClusterResponse.json @@ -20,8 +20,9 @@ // // Version 1 adds the EndpointType field, and makes MISMATCHED_ENDPOINT_TYPE and // UNSUPPORTED_ENDPOINT_TYPE valid top-level response error codes. + // Version 2 adds IsFenced field to Brokers for KIP-1073 support. // - "validVersions": "0-1", + "validVersions": "0-2", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", @@ -45,7 +46,9 @@ { "name": "Port", "type": "int32", "versions": "0+", "about": "The broker port." }, { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "The rack of the broker, or null if it has not been assigned to a rack." } + "about": "The rack of the broker, or null if it has not been assigned to a rack." }, + { "name": "IsFenced", "type": "bool", "versions": "2+", + "about": "Whether the broker is fenced" } ]}, { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this cluster." } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 44f6e1f5a88..1d1ae3e884b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3166,6 +3166,23 @@ public class KafkaAdminClientTest { } } + @Test + public void testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers() { + ApiVersion describeClusterV1 = new ApiVersion() + .setApiKey(ApiKeys.DESCRIBE_CLUSTER.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(describeClusterV1))); + + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof DescribeClusterRequest); + + final DescribeClusterResult result = env.adminClient().describeCluster(new DescribeClusterOptions().includeFencedBrokers(true)); + TestUtils.assertFutureThrows(result.nodes(), UnsupportedVersionException.class); + } + } + @Test public void testListConsumerGroups() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 566b04dfd25..405caf240b8 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3617,12 +3617,16 @@ class KafkaApis(val requestChannel: RequestChannel, clusterId, () => { val brokers = new DescribeClusterResponseData.DescribeClusterBrokerCollection() - metadataCache.getAliveBrokerNodes(request.context.listenerName).foreach { node => + val describeClusterRequest = request.body[DescribeClusterRequest] + metadataCache.getBrokerNodes(request.context.listenerName).foreach { node => + if (!node.isFenced || describeClusterRequest.data().includeFencedBrokers()) { brokers.add(new DescribeClusterResponseData.DescribeClusterBroker(). setBrokerId(node.id). setHost(node.host). setPort(node.port). - setRack(node.rack)) + setRack(node.rack). + setIsFenced(node.isFenced)) + } } brokers }, diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index c98431c44e9..562c9d0ce4a 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -75,6 +75,8 @@ trait MetadataCache { def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node] + def getBrokerNodes(listenerName: ListenerName): Iterable[Node] + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] /** diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index 5fad48f8a71..5d7484714d0 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -381,6 +381,10 @@ class KRaftMetadataCache( flatMap(_.node(listenerName.value()).toScala).toSeq } + override def getBrokerNodes(listenerName: ListenerName): Seq[Node] = { + _currentImage.cluster().brokers().values().asScala.flatMap(_.node(listenerName.value()).asScala).toSeq + } + // Does NOT include offline replica metadata override def getPartitionInfo(topicName: String, partitionId: Int): Option[UpdateMetadataPartitionState] = { Option(_currentImage.topics().getTopic(topicName)). diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index 36684f7ef0f..d7f1d868466 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -353,6 +353,10 @@ class ZkMetadataCache( metadataSnapshot.aliveBrokers.values.flatMap(_.getNode(listenerName)) } + override def getBrokerNodes(listenerName: ListenerName): Iterable[Node] = { + getAliveBrokerNodes(listenerName) + } + def getTopicId(topicName: String): Uuid = { metadataSnapshot.topicIds.getOrElse(topicName, Uuid.ZERO_UUID) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index bd381f0306e..cff801b2b50 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -595,6 +595,30 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(",")) } + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testListNodesWithFencedBroker(quorum: String): Unit = { + client = createAdminClient + val fencedBrokerId = brokers.last.config.brokerId + killBroker(fencedBrokerId, JDuration.ofMillis(0)) + // It takes a few seconds for a broker to get fenced after being killed + // So we retry until only 2 of 3 brokers returned in the result or the max wait is reached + TestUtils.retry(20000) { + assertTrue(client.describeCluster().nodes().get().asScala.size.equals(brokers.size - 1)) + } + + // List nodes again but this time include the fenced broker + val nodes = client.describeCluster(new DescribeClusterOptions().includeFencedBrokers(true)).nodes().get().asScala + assertTrue(nodes.size.equals(brokers.size)) + nodes.foreach(node => { + if (node.id().equals(fencedBrokerId)) { + assertTrue(node.isFenced) + } else { + assertFalse(node.isFenced) + } + }) + } + @ParameterizedTest @ValueSource(strings = Array("kraft")) def testAdminClientHandlingBadIPWithoutTimeout(quorum: String): Unit = { diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala index 5063e79ad08..5fcc0449bb5 100644 --- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala @@ -18,7 +18,7 @@ import java.util.Properties import com.yammer.metrics.core.Gauge import kafka.security.JaasTestUtils import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.{AdminClientConfig, CreateAclsResult} +import org.apache.kafka.clients.admin.{AdminClientConfig, CreateAclsResult, DescribeClusterOptions} import org.apache.kafka.common.acl._ import org.apache.kafka.common.config.SslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs @@ -32,7 +32,7 @@ import org.apache.kafka.common.network.ConnectionMode import org.apache.kafka.common.utils.Utils import org.apache.kafka.metadata.authorizer.{ClusterMetadataAuthorizer, StandardAuthorizer} import org.apache.kafka.server.metrics.KafkaYammerMetrics -import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertThrows, assertTrue} import org.junit.jupiter.api.{AfterEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -158,6 +158,25 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { super.tearDown() } + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testListNodesFromControllersIncludingFencedBrokers(quorum: String): Unit = { + useBoostrapControllers() + client = createAdminClient + val result = client.describeCluster(new DescribeClusterOptions().includeFencedBrokers(true)) + val exception = assertThrows(classOf[Exception], () => { result.nodes().get()}) + assertTrue(exception.getCause.getCause.getMessage.contains("Cannot request fenced brokers from controller endpoint")) + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testListNodesFromControllers(quorum: String): Unit = { + useBoostrapControllers() + client = createAdminClient + val result = client.describeCluster(new DescribeClusterOptions()) + assertTrue(result.nodes().get().size().equals(controllerServers.size)) + } + @ParameterizedTest @ValueSource(strings = Array("kraft")) def testAclUpdatesUsingSynchronousAuthorizer(quorum: String): Unit = { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java index 2bd77f06e11..a7409da27a4 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java @@ -233,7 +233,7 @@ public class BrokerRegistration { if (endpoint == null) { return Optional.empty(); } - return Optional.of(new Node(id, endpoint.host(), endpoint.port(), rack.orElse(null))); + return Optional.of(new Node(id, endpoint.host(), endpoint.port(), rack.orElse(null), fenced)); } public Map<String, VersionRange> supportedFeatures() { diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java index 485a39aad8b..e45234d225e 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java @@ -159,7 +159,7 @@ public class BrokerRegistrationTest { assertEquals(Optional.empty(), REGISTRATIONS.get(0).node("NONEXISTENT")); assertEquals(Optional.of(new Node(0, "localhost", 9090, null)), REGISTRATIONS.get(0).node("INTERNAL")); - assertEquals(Optional.of(new Node(1, "localhost", 9091, null)), + assertEquals(Optional.of(new Node(1, "localhost", 9091, null, true)), REGISTRATIONS.get(1).node("INTERNAL")); assertEquals(Optional.of(new Node(2, "localhost", 9092, "myrack")), REGISTRATIONS.get(2).node("INTERNAL")); diff --git a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java index f4699d221e2..370d756d493 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java @@ -17,6 +17,8 @@ package org.apache.kafka.tools; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeClusterOptions; +import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; @@ -31,11 +33,13 @@ import net.sourceforge.argparse4j.inf.Subparsers; import java.io.PrintStream; import java.util.Arrays; +import java.util.Collection; import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; public class ClusterTool { @@ -68,7 +72,9 @@ public class ClusterTool { .help("Get information about the ID of a cluster."); Subparser unregisterParser = subparsers.addParser("unregister") .help("Unregister a broker."); - for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser)) { + Subparser listEndpoints = subparsers.addParser("list-endpoints") + .help("List endpoints"); + for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser, listEndpoints)) { MutuallyExclusiveGroup connectionOptions = subpparser.addMutuallyExclusiveGroup().required(true); connectionOptions.addArgument("--bootstrap-server", "-b") .action(store()) @@ -85,6 +91,9 @@ public class ClusterTool { .action(store()) .required(true) .help("The ID of the broker to unregister."); + listEndpoints.addArgument("--include-fenced-brokers") + .action(storeTrue()) + .help("Whether to include fenced brokers when listing broker endpoints"); Namespace namespace = parser.parseArgsOrFail(args); String command = namespace.getString("command"); @@ -108,6 +117,17 @@ public class ClusterTool { } break; } + case "list-endpoints": { + try (Admin adminClient = Admin.create(properties)) { + boolean includeFencedBrokers = Optional.of(namespace.getBoolean("include_fenced_brokers")).orElse(false); + boolean listControllerEndpoints = namespace.getString("bootstrap_controller") != null; + if (includeFencedBrokers && listControllerEndpoints) { + throw new IllegalArgumentException("The option --include-fenced-brokers is only supported with --bootstrap-server option"); + } + listEndpoints(System.out, adminClient, listControllerEndpoints, includeFencedBrokers); + } + break; + } default: throw new RuntimeException("Unknown command " + command); } @@ -135,4 +155,44 @@ public class ClusterTool { } } } + + static void listEndpoints(PrintStream stream, Admin adminClient, boolean listControllerEndpoints, boolean includeFencedBrokers) throws Exception { + try { + DescribeClusterOptions option = new DescribeClusterOptions().includeFencedBrokers(includeFencedBrokers); + Collection<Node> nodes = adminClient.describeCluster(option).nodes().get(); + + String maxHostLength = String.valueOf(nodes.stream().map(node -> node.host().length()).max(Integer::compareTo).orElse(100)); + String maxRackLength = String.valueOf(nodes.stream().filter(node -> node.hasRack()).map(node -> node.rack().length()).max(Integer::compareTo).orElse(10)); + + if (listControllerEndpoints) { + String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-15s%n"; + stream.printf(format, "ID", "HOST", "PORT", "RACK", "ENDPOINT_TYPE"); + nodes.stream().forEach(node -> stream.printf(format, + node.idString(), + node.host(), + node.port(), + node.rack(), + "controller" + )); + } else { + String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-10s %-15s%n"; + stream.printf(format, "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE"); + nodes.stream().forEach(node -> stream.printf(format, + node.idString(), + node.host(), + node.port(), + node.rack(), + node.isFenced() ? "fenced" : "unfenced", + "broker" + )); + } + } catch (ExecutionException ee) { + Throwable cause = ee.getCause(); + if (cause instanceof UnsupportedVersionException) { + stream.println(ee.getCause().getMessage()); + } else { + throw ee; + } + } + } } diff --git a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java index 74919740170..aac6a3f48ff 100644 --- a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java @@ -54,7 +54,7 @@ public class BrokerApiVersionsCommandTest { BrokerApiVersionsCommand.mainNoExit("--bootstrap-server", clusterInstance.bootstrapServers())); Iterator<String> lineIter = Arrays.stream(output.split("\n")).iterator(); assertTrue(lineIter.hasNext()); - assertEquals(clusterInstance.bootstrapServers() + " (id: 0 rack: null) -> (", lineIter.next()); + assertEquals(clusterInstance.bootstrapServers() + " (id: 0 rack: null isFenced: false) -> (", lineIter.next()); ApiMessageType.ListenerType listenerType = ApiMessageType.ListenerType.BROKER; diff --git a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java index d27839269f4..b21795411e2 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java @@ -29,8 +29,11 @@ import org.junit.jupiter.api.extension.ExtendWith; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.util.Arrays; +import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -61,6 +64,34 @@ public class ClusterToolTest { assertTrue(output.contains("Broker " + brokerId + " is no longer registered.")); } + @ClusterTest(brokers = 1, types = {Type.KRAFT, Type.CO_KRAFT}) + public void testListEndpointsWithBootstrapServer(ClusterInstance clusterInstance) { + String output = ToolsTestUtils.captureStandardOut(() -> + assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints", "--bootstrap-server", clusterInstance.bootstrapServers()))); + String port = clusterInstance.bootstrapServers().split(":")[1]; + int id = clusterInstance.brokerIds().iterator().next(); + String format = "%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-6s"; + String expected = String.format(format, "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE", id, "localhost", port, "null", "unfenced", "broker"); + assertEquals(expected, output); + } + + @ClusterTest(brokers = 2, types = {Type.KRAFT, Type.CO_KRAFT}) + public void testListEndpointsArgumentWithBootstrapServer(ClusterInstance clusterInstance) { + List<Integer> brokerIds = clusterInstance.brokerIds().stream().collect(Collectors.toList()); + clusterInstance.shutdownBroker(brokerIds.get(0)); + + List<String> ports = Arrays.stream(clusterInstance.bootstrapServers().split(",")).map(b -> b.split(":")[1]).collect(Collectors.toList()); + String format = "%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s %-6s"; + String expected = String.format(format, + "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE", + brokerIds.get(0), "localhost", ports.get(0), "null", "fenced", "broker", + brokerIds.get(1), "localhost", ports.get(1), "null", "unfenced", "broker"); + + String output = ToolsTestUtils.captureStandardOut(() -> assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints", "--bootstrap-server", clusterInstance.bootstrapServers(), "--include-fenced-brokers"))); + + assertEquals(expected, output); + } + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}) public void testClusterIdWithBootstrapController(ClusterInstance clusterInstance) { String output = ToolsTestUtils.captureStandardOut(() -> @@ -83,6 +114,25 @@ public class ClusterToolTest { "the controller quorum.", exception.getCause().getMessage()); } + @ClusterTest(brokers = 3, types = {Type.KRAFT, Type.CO_KRAFT}) + public void testListEndpointsWithBootstrapController(ClusterInstance clusterInstance) { + String output = ToolsTestUtils.captureStandardOut(() -> + assertDoesNotThrow(() -> ClusterTool.execute("list-endpoints", "--bootstrap-controller", clusterInstance.bootstrapControllers()))); + String port = clusterInstance.bootstrapControllers().split(":")[1]; + int id = clusterInstance.controllerIds().iterator().next(); + String format = "%-10s %-9s %-10s %-10s %-15s%n%-10s %-9s %-10s %-10s %-10s"; + String expected = String.format(format, "ID", "HOST", "PORT", "RACK", "ENDPOINT_TYPE", id, "localhost", port, "null", "controller"); + assertTrue(output.equals(expected)); + } + + @ClusterTest(brokers = 3, types = {Type.KRAFT, Type.CO_KRAFT}) + public void testListEndpointsArgumentWithBootstrapController(ClusterInstance clusterInstance) { + RuntimeException exception = + assertThrows(RuntimeException.class, + () -> ClusterTool.execute("list-endpoints", "--bootstrap-controller", clusterInstance.bootstrapControllers(), "--include-fenced-brokers")); + assertEquals("The option --include-fenced-brokers is only supported with --bootstrap-server option", exception.getMessage()); + } + @Test public void testPrintClusterId() throws Exception { Admin adminClient = new MockAdminClient.Builder().