This is an automated email from the ASF dual-hosted git repository.

aahmed pushed a commit to branch PLSR-2091_3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 73c635838df74e27112ce45c9fcd350a97234e83
Author: Ali Ahmed <[email protected]>
AuthorDate: Thu Jun 17 11:44:37 2021 -0700

    Support new topic format for broker admin healthcheck endpoint.
    
    * This runs the health check with new topic format
    * This help the pulsar-admin tool work correctly
---
 .../pulsar/broker/admin/impl/BrokersBase.java      | 83 ++++++++++++++--------
 .../pulsar/broker/namespace/NamespaceService.java  | 35 ++++++++-
 .../apache/pulsar/broker/SLAMonitoringTest.java    |  2 +-
 .../broker/admin/AdminApiHealthCheckTest.java      | 83 ++++++++++++++++++++++
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  4 +-
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |  4 +-
 .../org/apache/pulsar/client/admin/Brokers.java    | 15 ++++
 .../apache/pulsar/common/naming/TopicVersion.java  | 24 +++++++
 .../pulsar/client/admin/internal/BrokersImpl.java  | 20 +++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  4 +-
 .../org/apache/pulsar/admin/cli/CmdBrokers.java    |  6 +-
 11 files changed, 239 insertions(+), 41 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 2e80dd3..01c7921 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.admin.impl;
 import static 
org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
 import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.time.Duration;
@@ -35,11 +36,13 @@ import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService.State;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.LeaderBroker;
@@ -55,6 +58,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
+import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.BrokerInfo;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -298,13 +302,35 @@ public class BrokersBase extends PulsarWebResource {
         @ApiResponse(code = 403, message = "Don't have admin permission"),
         @ApiResponse(code = 404, message = "Cluster doesn't exist"),
         @ApiResponse(code = 500, message = "Internal server error")})
-    public void healthcheck(@Suspended AsyncResponse asyncResponse) throws 
Exception {
-        validateSuperUserAccess();
-        String heartbeatNamespace = NamespaceService.getHeartbeatNamespace(
-                pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
-        String topic = String.format("persistent://%s/healthcheck", 
heartbeatNamespace);
+    @ApiParam(value = "Topic Version")
+    public void healthcheck(@Suspended AsyncResponse asyncResponse,
+                            @QueryParam("topicVersion") TopicVersion 
topicVersion) throws Exception {
+        String topic;
+        PulsarClient client;
+        try {
+            validateSuperUserAccess();
+            String heartbeatNamespace;
+
+            heartbeatNamespace = (topicVersion == TopicVersion.V2)
+                    ?
+                    NamespaceService.getHeartbeatNamespaceV2(
+                            pulsar().getAdvertisedAddress(),
+                            pulsar().getConfiguration())
+                    :
+                    NamespaceService.getHeartbeatNamespace(
+                            pulsar().getAdvertisedAddress(),
+                            pulsar().getConfiguration());
+
+
+            topic = String.format("persistent://%s/healthcheck", 
heartbeatNamespace);
 
-        PulsarClient client = pulsar().getClient();
+            LOG.info("Running healthCheck with topic={}", topic);
+
+            client = pulsar().getClient();
+        } catch (Exception e) {
+            LOG.error("Error getting heathcheck topic info", e);
+            throw new PulsarServerException(e);
+        }
 
         String messageStr = UUID.randomUUID().toString();
         // create non-partitioned topic manually and close the previous reader 
if present.
@@ -321,10 +347,11 @@ public class BrokersBase extends PulsarWebResource {
         } catch (Exception e) {
             LOG.warn("Failed to try to delete subscriptions for health check", 
e);
         }
+
         CompletableFuture<Producer<String>> producerFuture =
-            client.newProducer(Schema.STRING).topic(topic).createAsync();
+                client.newProducer(Schema.STRING).topic(topic).createAsync();
         CompletableFuture<Reader<String>> readerFuture = 
client.newReader(Schema.STRING)
-            .topic(topic).startMessageId(MessageId.latest).createAsync();
+                .topic(topic).startMessageId(MessageId.latest).createAsync();
 
         CompletableFuture<Void> completePromise = new CompletableFuture<>();
 
@@ -334,7 +361,7 @@ public class BrokersBase extends PulsarWebResource {
                         completePromise.completeExceptionally(exception);
                     } else {
                         producerFuture.thenCompose((producer) -> 
producer.sendAsync(messageStr))
-                            .whenComplete((ignore2, exception2) -> {
+                                .whenComplete((ignore2, exception2) -> {
                                     if (exception2 != null) {
                                         
completePromise.completeExceptionally(exception2);
                                     }
@@ -351,26 +378,26 @@ public class BrokersBase extends PulsarWebResource {
                 });
 
         completePromise.whenComplete((ignore, exception) -> {
-                producerFuture.thenAccept((producer) -> {
-                        producer.closeAsync().whenComplete((ignore2, 
exception2) -> {
-                                if (exception2 != null) {
-                                    LOG.warn("Error closing producer for 
healthcheck", exception2);
-                                }
-                            });
-                    });
-                readerFuture.thenAccept((reader) -> {
-                        reader.closeAsync().whenComplete((ignore2, exception2) 
-> {
-                                if (exception2 != null) {
-                                    LOG.warn("Error closing reader for 
healthcheck", exception2);
-                                }
-                            });
-                    });
-                if (exception != null) {
-                    asyncResponse.resume(new RestException(exception));
-                } else {
-                    asyncResponse.resume("ok");
-                }
+            producerFuture.thenAccept((producer) -> {
+                producer.closeAsync().whenComplete((ignore2, exception2) -> {
+                    if (exception2 != null) {
+                        LOG.warn("Error closing producer for healthcheck", 
exception2);
+                    }
+                });
             });
+            readerFuture.thenAccept((reader) -> {
+                reader.closeAsync().whenComplete((ignore2, exception2) -> {
+                    if (exception2 != null) {
+                        LOG.warn("Error closing reader for healthcheck", 
exception2);
+                    }
+                });
+            });
+            if (exception != null) {
+                asyncResponse.resume(new RestException(exception));
+            } else {
+                asyncResponse.resume("ok");
+            }
+        });
     }
 
     private void healthcheckReadLoop(CompletableFuture<Reader<String>> 
readerFuture,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index fd6c74e..bf652df 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -140,8 +140,10 @@ public class NamespaceService implements AutoCloseable {
 
     public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
     public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = 
Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
+    public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = 
Pattern.compile("pulsar/([^:]+:\\d+)");
     public static final Pattern SLA_NAMESPACE_PATTERN = 
Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
     public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s";
+    public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s:%s";
     public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + 
"/%s/%s:%s";
 
     public static final String NAMESPACE_ISOLATION_POLICIES = 
"namespaceIsolationPolicies";
@@ -305,6 +307,12 @@ public class NamespaceService implements AutoCloseable {
             LOG.info("added heartbeat namespace name in local cache: ns={}", 
getHeartbeatNamespace(host, config));
         }
 
+        // ensure that we own the heartbeat namespace
+        if (registerNamespace(getHeartbeatNamespaceV2(host, config), true)) {
+            this.uncountedNamespaces++;
+            LOG.info("added heartbeat namespace name in local cache: ns={}", 
getHeartbeatNamespaceV2(host, config));
+        }
+
         // we may not need strict ownership checking for bootstrap names for 
now
         for (String namespace : config.getBootstrapNamespaces()) {
             if (registerNamespace(namespace, false)) {
@@ -471,6 +479,9 @@ public class NamespaceService implements AutoCloseable {
             // check if this is Heartbeat or SLAMonitor namespace
             candidateBroker = checkHeartbeatNamespace(bundle);
             if (candidateBroker == null) {
+                candidateBroker = checkHeartbeatNamespaceV2(bundle);
+            }
+            if (candidateBroker == null) {
                 String broker = getSLAMonitorBrokerName(bundle);
                 // checking if the broker is up and running
                 if (broker != null && isBrokerActive(broker)) {
@@ -1219,7 +1230,6 @@ public class NamespaceService implements AutoCloseable {
         return client.getLookup().getTopicsUnderNamespace(namespace, 
Mode.NON_PERSISTENT);
     }
 
-
     public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) {
         PulsarClientImpl client = namespaceClients.get(cluster);
         if (client != null) {
@@ -1295,7 +1305,18 @@ public class NamespaceService implements AutoCloseable {
         }
         return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), 
host, port);
     }
-     public static String getSLAMonitorNamespace(String host, 
ServiceConfiguration config) {
+
+    public static String getHeartbeatNamespaceV2(String host, 
ServiceConfiguration config) {
+        Integer port = null;
+        if (config.getWebServicePort().isPresent()) {
+            port = config.getWebServicePort().get();
+        } else if (config.getWebServicePortTls().isPresent()) {
+            port = config.getWebServicePortTls().get();
+        }
+        return String.format(HEARTBEAT_NAMESPACE_FMT_V2, host, port);
+    }
+
+    public static String getSLAMonitorNamespace(String host, 
ServiceConfiguration config) {
         Integer port = null;
         if (config.getWebServicePort().isPresent()) {
             port = config.getWebServicePort().get();
@@ -1315,6 +1336,16 @@ public class NamespaceService implements AutoCloseable {
         }
     }
 
+    public static String checkHeartbeatNamespaceV2(ServiceUnitId ns) {
+        Matcher m = 
HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(ns.getNamespaceObject().toString());
+        if (m.matches()) {
+            LOG.debug("SLAMonitoring namespace matched the lookup namespace 
{}", ns.getNamespaceObject().toString());
+            return String.format("http://%s";, m.group(1));
+        } else {
+            return null;
+        }
+    }
+
     public static String getSLAMonitorBrokerName(ServiceUnitId ns) {
         Matcher m = 
SLA_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
         if (m.matches()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index ce55717..17eceaa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -162,7 +162,7 @@ public class SLAMonitoringTest {
 
                 Map<String, NamespaceOwnershipStatus> nsMap = 
pulsarAdmins[i].brokers().getOwnedNamespaces("my-cluster",
                         list.get(0));
-                Assert.assertEquals(nsMap.size(), 2);
+                Assert.assertEquals(nsMap.size(), 3);
             }
         } catch (Exception e) {
             e.printStackTrace();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
new file mode 100644
index 0000000..e0c887f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicVersion;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.net.URL;
+
+@Test(groups = "broker")
+@Slf4j
+public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        resetConfig();
+        super.internalSetup();
+        admin.clusters().createCluster("test",
+                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(
+                Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("pulsar", tenantInfo);
+        admin.namespaces().createNamespace("pulsar/system", 
Sets.newHashSet("test"));
+        admin.tenants().createTenant("public", tenantInfo);
+        admin.namespaces().createNamespace("public/default", 
Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testHealthCheckup() throws Exception {
+        admin.brokers().healthcheck();
+    }
+
+    @Test
+    public void testHealthCheckupV1() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V1);
+    }
+
+    @Test(expectedExceptions = PulsarAdminException.class)
+    public void testHealthCheckupV2Error() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+    }
+
+    @Test
+    public void testHealthCheckupV2() throws Exception {
+        final URL pulsarWebAddress = new URL(pulsar.getWebServiceAddress());
+        final String targetNameSpace = "pulsar/" +
+                pulsarWebAddress.getHost() + ":" + pulsarWebAddress.getPort();
+        log.info("Target namespace for broker admin healthcheck V2 endpoint is 
{}", targetNameSpace);
+        admin.namespaces().createNamespace(targetNameSpace);
+        admin.brokers().healthcheck(TopicVersion.V2);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 8061b84..5d96389 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -474,7 +474,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         Map<String, NamespaceOwnershipStatus> nsMap = 
admin.brokers().getOwnedNamespaces("test", list.get(0));
         // since sla-monitor ns is not created nsMap.size() == 1 (for 
HeartBeat Namespace)
-        Assert.assertEquals(nsMap.size(), 1);
+        Assert.assertEquals(nsMap.size(), 2);
         for (String ns : nsMap.keySet()) {
             NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
             if (ns.equals(
@@ -490,7 +490,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertEquals(parts.length, 2);
         Map<String, NamespaceOwnershipStatus> nsMap2 = 
adminTls.brokers().getOwnedNamespaces("test",
                 String.format("%s:%d", parts[0], 
pulsar.getListenPortHTTPS().get()));
-        Assert.assertEquals(nsMap2.size(), 1);
+        Assert.assertEquals(nsMap2.size(), 2);
 
         admin.namespaces().deleteNamespace("prop-xyz/ns1");
         admin.clusters().deleteCluster("test");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 58ee083..3f67eb6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -410,7 +410,7 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         Map<String, NamespaceOwnershipStatus> nsMap = 
admin.brokers().getOwnedNamespaces("use", list.get(0));
         // since sla-monitor ns is not created nsMap.size() == 1 (for 
HeartBeat Namespace)
-        Assert.assertEquals(nsMap.size(), 1);
+        Assert.assertEquals(nsMap.size(), 2);
         for (String ns : nsMap.keySet()) {
             NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
             if (ns.equals(
@@ -426,7 +426,7 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertEquals(parts.length, 2);
         Map<String, NamespaceOwnershipStatus> nsMap2 = 
adminTls.brokers().getOwnedNamespaces("use",
                 String.format("%s:%d", parts[0], 
pulsar.getListenPortHTTPS().get()));
-        Assert.assertEquals(nsMap2.size(), 1);
+        Assert.assertEquals(nsMap2.size(), 2);
 
         admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
         admin.clusters().deleteCluster("use");
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
index c444b2b..100988c 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
+import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.BrokerInfo;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
 
@@ -268,14 +269,28 @@ public interface Brokers {
      *
      * @throws PulsarAdminException if the healthcheck fails.
      */
+    @Deprecated
     void healthcheck() throws PulsarAdminException;
 
     /**
      * Run a healthcheck on the broker asynchronously.
      */
+    @Deprecated
     CompletableFuture<Void> healthcheckAsync();
 
     /**
+     * Run a healthcheck on the broker.
+     *
+     * @throws PulsarAdminException if the healthcheck fails.
+     */
+    void healthcheck(TopicVersion topicVersion) throws PulsarAdminException;
+
+    /**
+     * Run a healthcheck on the broker asynchronously.
+     */
+    CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion);
+
+    /**
      * Get version of broker.
      * @return version of broker.
      */
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/naming/TopicVersion.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/naming/TopicVersion.java
new file mode 100644
index 0000000..48bed00
--- /dev/null
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/naming/TopicVersion.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+public enum TopicVersion {
+    V1,
+    V2,
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index 53a3703..b520523 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.admin.Brokers;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
+import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.BrokerInfo;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
 import org.apache.pulsar.common.util.Codec;
@@ -352,9 +353,21 @@ public class BrokersImpl extends BaseResource implements 
Brokers {
     }
 
     @Override
+    @Deprecated
     public void healthcheck() throws PulsarAdminException {
+        healthcheck(TopicVersion.V1);
+    }
+
+    @Override
+    @Deprecated
+    public CompletableFuture<Void> healthcheckAsync() {
+        return healthcheckAsync(TopicVersion.V1);
+    }
+
+    @Override
+    public void healthcheck(TopicVersion topicVersion) throws 
PulsarAdminException {
         try {
-            healthcheckAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+            healthcheckAsync(topicVersion).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
@@ -366,8 +379,11 @@ public class BrokersImpl extends BaseResource implements 
Brokers {
     }
 
     @Override
-    public CompletableFuture<Void> healthcheckAsync() {
+    public CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion) 
{
         WebTarget path = adminBrokers.path("health");
+        if (topicVersion != null) {
+            path = path.queryParam("topicVersion", topicVersion);
+        }
         final CompletableFuture<Void> future = new CompletableFuture<>();
         asyncGetRequest(path,
                 new InvocationCallback<String>() {
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 47e075f..11f5156 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -74,11 +74,9 @@ import 
org.apache.pulsar.common.policies.data.BookiesClusterInfo;
 import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.FailureDomain;
-import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo;
@@ -136,7 +134,7 @@ public class PulsarAdminToolTest {
         verify(mockBrokers).getRuntimeConfigurations();
 
         brokers.run(split("healthcheck"));
-        verify(mockBrokers).healthcheck();
+        verify(mockBrokers).healthcheck(null);
 
         brokers.run(split("version"));
         verify(mockBrokers).getVersion();
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
index 1fe1c29..a9a0dbe 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
@@ -22,6 +22,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
+import org.apache.pulsar.common.naming.TopicVersion;
 
 import java.util.function.Supplier;
 
@@ -127,9 +128,12 @@ public class CmdBrokers extends CmdBase {
     @Parameters(commandDescription = "Run a health check against the broker")
     private class HealthcheckCmd extends CliCommand {
 
+        @Parameter(names = "--topic-version", description = "topic version V1 
is default")
+        private TopicVersion topicVersion;
+
         @Override
         void run() throws Exception {
-            getAdmin().brokers().healthcheck();
+            getAdmin().brokers().healthcheck(topicVersion);
             System.out.println("ok");
         }
 

Reply via email to