mumrah commented on code in PR #14399:
URL: https://github.com/apache/kafka/pull/14399#discussion_r1333463150


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4009,12 +4134,11 @@ void handleFailure(Throwable throwable) {
 
     @Override
     public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
-        NodeProvider provider = new LeastLoadedNodeProvider();
-
         final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
         final Call call = new Call(
-                "describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+                "describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()),
+                bc -> bc ? new ControllerNodeProvider() : new 
LeastLoadedNodeProvider()) {

Review Comment:
   bc? Broker + Controller?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -1457,6 +1495,58 @@ void call(Call call, long now) {
          * Create a new metadata call.
          */
         private Call makeMetadataCall(long now) {
+            if (metadataManager.usingBootstrapControllers()) {
+                return makeControllerMetadataCall(now);
+            } else {
+                return makeBrokerMetadataCall(now);
+            }
+        }
+
+        private Call makeControllerMetadataCall(long now) {
+            // Use DescribeCluster here, as specified by KIP-919.
+            return new Call(true, "describeCluster", calcDeadlineMs(now, 
requestTimeoutMs),
+                    new MetadataUpdateNodeIdProvider()) {
+                @Override
+                public DescribeClusterRequest.Builder createRequest(int 
timeoutMs) {
+                    return new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()
+                            .setIncludeClusterAuthorizedOperations(false)
+                            .setEndpointType(EndpointType.CONTROLLER.id()));

Review Comment:
   nit: too much tab



##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -1028,6 +1028,10 @@ class ControllerApis(
   def handleDescribeCluster(request: RequestChannel.Request): 
CompletableFuture[Unit] = {

Review Comment:
   Is this request only ever originated from admin client? 



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -719,38 +726,66 @@ public Node provide() {
 
     abstract class Call {
         private final boolean internal;
+        private final boolean supportsUseControllers;
         private final String callName;
         private final long deadlineMs;
         private final NodeProvider nodeProvider;
         protected int tries;
         private Node curNode = null;
         private long nextAllowedTryMs;
 
-        Call(boolean internal,
-             String callName,
-             long nextAllowedTryMs,
-             int tries,
-             long deadlineMs,
-             NodeProvider nodeProvider
+        Call(

Review Comment:
   The other constructors are all supplying different defaults to 
`supportsUseControllers`. Is that expected? When should a constructor default 
true vs false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to