This is an automated email from the ASF dual-hosted git repository. aahmed pushed a commit to branch PLSR-2091_3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 73c635838df74e27112ce45c9fcd350a97234e83 Author: Ali Ahmed <[email protected]> AuthorDate: Thu Jun 17 11:44:37 2021 -0700 Support new topic format for broker admin healthcheck endpoint. * This runs the health check with new topic format * This help the pulsar-admin tool work correctly --- .../pulsar/broker/admin/impl/BrokersBase.java | 83 ++++++++++++++-------- .../pulsar/broker/namespace/NamespaceService.java | 35 ++++++++- .../apache/pulsar/broker/SLAMonitoringTest.java | 2 +- .../broker/admin/AdminApiHealthCheckTest.java | 83 ++++++++++++++++++++++ .../apache/pulsar/broker/admin/AdminApiTest.java | 4 +- .../pulsar/broker/admin/v1/V1_AdminApiTest.java | 4 +- .../org/apache/pulsar/client/admin/Brokers.java | 15 ++++ .../apache/pulsar/common/naming/TopicVersion.java | 24 +++++++ .../pulsar/client/admin/internal/BrokersImpl.java | 20 +++++- .../pulsar/admin/cli/PulsarAdminToolTest.java | 4 +- .../org/apache/pulsar/admin/cli/CmdBrokers.java | 6 +- 11 files changed, 239 insertions(+), 41 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 2e80dd3..01c7921 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.admin.impl; import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH; import com.google.common.collect.Maps; import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import java.time.Duration; @@ -35,11 +36,13 @@ import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService.State; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.LeaderBroker; @@ -55,6 +58,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.conf.InternalConfigurationData; +import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.BrokerInfo; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.util.FutureUtil; @@ -298,13 +302,35 @@ public class BrokersBase extends PulsarWebResource { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")}) - public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception { - validateSuperUserAccess(); - String heartbeatNamespace = NamespaceService.getHeartbeatNamespace( - pulsar().getAdvertisedAddress(), pulsar().getConfiguration()); - String topic = String.format("persistent://%s/healthcheck", heartbeatNamespace); + @ApiParam(value = "Topic Version") + public void healthcheck(@Suspended AsyncResponse asyncResponse, + @QueryParam("topicVersion") TopicVersion topicVersion) throws Exception { + String topic; + PulsarClient client; + try { + validateSuperUserAccess(); + String heartbeatNamespace; + + heartbeatNamespace = (topicVersion == TopicVersion.V2) + ? + NamespaceService.getHeartbeatNamespaceV2( + pulsar().getAdvertisedAddress(), + pulsar().getConfiguration()) + : + NamespaceService.getHeartbeatNamespace( + pulsar().getAdvertisedAddress(), + pulsar().getConfiguration()); + + + topic = String.format("persistent://%s/healthcheck", heartbeatNamespace); - PulsarClient client = pulsar().getClient(); + LOG.info("Running healthCheck with topic={}", topic); + + client = pulsar().getClient(); + } catch (Exception e) { + LOG.error("Error getting heathcheck topic info", e); + throw new PulsarServerException(e); + } String messageStr = UUID.randomUUID().toString(); // create non-partitioned topic manually and close the previous reader if present. @@ -321,10 +347,11 @@ public class BrokersBase extends PulsarWebResource { } catch (Exception e) { LOG.warn("Failed to try to delete subscriptions for health check", e); } + CompletableFuture<Producer<String>> producerFuture = - client.newProducer(Schema.STRING).topic(topic).createAsync(); + client.newProducer(Schema.STRING).topic(topic).createAsync(); CompletableFuture<Reader<String>> readerFuture = client.newReader(Schema.STRING) - .topic(topic).startMessageId(MessageId.latest).createAsync(); + .topic(topic).startMessageId(MessageId.latest).createAsync(); CompletableFuture<Void> completePromise = new CompletableFuture<>(); @@ -334,7 +361,7 @@ public class BrokersBase extends PulsarWebResource { completePromise.completeExceptionally(exception); } else { producerFuture.thenCompose((producer) -> producer.sendAsync(messageStr)) - .whenComplete((ignore2, exception2) -> { + .whenComplete((ignore2, exception2) -> { if (exception2 != null) { completePromise.completeExceptionally(exception2); } @@ -351,26 +378,26 @@ public class BrokersBase extends PulsarWebResource { }); completePromise.whenComplete((ignore, exception) -> { - producerFuture.thenAccept((producer) -> { - producer.closeAsync().whenComplete((ignore2, exception2) -> { - if (exception2 != null) { - LOG.warn("Error closing producer for healthcheck", exception2); - } - }); - }); - readerFuture.thenAccept((reader) -> { - reader.closeAsync().whenComplete((ignore2, exception2) -> { - if (exception2 != null) { - LOG.warn("Error closing reader for healthcheck", exception2); - } - }); - }); - if (exception != null) { - asyncResponse.resume(new RestException(exception)); - } else { - asyncResponse.resume("ok"); - } + producerFuture.thenAccept((producer) -> { + producer.closeAsync().whenComplete((ignore2, exception2) -> { + if (exception2 != null) { + LOG.warn("Error closing producer for healthcheck", exception2); + } + }); }); + readerFuture.thenAccept((reader) -> { + reader.closeAsync().whenComplete((ignore2, exception2) -> { + if (exception2 != null) { + LOG.warn("Error closing reader for healthcheck", exception2); + } + }); + }); + if (exception != null) { + asyncResponse.resume(new RestException(exception)); + } else { + asyncResponse.resume("ok"); + } + }); } private void healthcheckReadLoop(CompletableFuture<Reader<String>> readerFuture, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index fd6c74e..bf652df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -140,8 +140,10 @@ public class NamespaceService implements AutoCloseable { public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor"; public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)"); + public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = Pattern.compile("pulsar/([^:]+:\\d+)"); public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)"); public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s"; + public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s:%s"; public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s:%s"; public static final String NAMESPACE_ISOLATION_POLICIES = "namespaceIsolationPolicies"; @@ -305,6 +307,12 @@ public class NamespaceService implements AutoCloseable { LOG.info("added heartbeat namespace name in local cache: ns={}", getHeartbeatNamespace(host, config)); } + // ensure that we own the heartbeat namespace + if (registerNamespace(getHeartbeatNamespaceV2(host, config), true)) { + this.uncountedNamespaces++; + LOG.info("added heartbeat namespace name in local cache: ns={}", getHeartbeatNamespaceV2(host, config)); + } + // we may not need strict ownership checking for bootstrap names for now for (String namespace : config.getBootstrapNamespaces()) { if (registerNamespace(namespace, false)) { @@ -471,6 +479,9 @@ public class NamespaceService implements AutoCloseable { // check if this is Heartbeat or SLAMonitor namespace candidateBroker = checkHeartbeatNamespace(bundle); if (candidateBroker == null) { + candidateBroker = checkHeartbeatNamespaceV2(bundle); + } + if (candidateBroker == null) { String broker = getSLAMonitorBrokerName(bundle); // checking if the broker is up and running if (broker != null && isBrokerActive(broker)) { @@ -1219,7 +1230,6 @@ public class NamespaceService implements AutoCloseable { return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT); } - public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) { PulsarClientImpl client = namespaceClients.get(cluster); if (client != null) { @@ -1295,7 +1305,18 @@ public class NamespaceService implements AutoCloseable { } return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), host, port); } - public static String getSLAMonitorNamespace(String host, ServiceConfiguration config) { + + public static String getHeartbeatNamespaceV2(String host, ServiceConfiguration config) { + Integer port = null; + if (config.getWebServicePort().isPresent()) { + port = config.getWebServicePort().get(); + } else if (config.getWebServicePortTls().isPresent()) { + port = config.getWebServicePortTls().get(); + } + return String.format(HEARTBEAT_NAMESPACE_FMT_V2, host, port); + } + + public static String getSLAMonitorNamespace(String host, ServiceConfiguration config) { Integer port = null; if (config.getWebServicePort().isPresent()) { port = config.getWebServicePort().get(); @@ -1315,6 +1336,16 @@ public class NamespaceService implements AutoCloseable { } } + public static String checkHeartbeatNamespaceV2(ServiceUnitId ns) { + Matcher m = HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(ns.getNamespaceObject().toString()); + if (m.matches()) { + LOG.debug("SLAMonitoring namespace matched the lookup namespace {}", ns.getNamespaceObject().toString()); + return String.format("http://%s", m.group(1)); + } else { + return null; + } + } + public static String getSLAMonitorBrokerName(ServiceUnitId ns) { Matcher m = SLA_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString()); if (m.matches()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index ce55717..17eceaa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -162,7 +162,7 @@ public class SLAMonitoringTest { Map<String, NamespaceOwnershipStatus> nsMap = pulsarAdmins[i].brokers().getOwnedNamespaces("my-cluster", list.get(0)); - Assert.assertEquals(nsMap.size(), 2); + Assert.assertEquals(nsMap.size(), 3); } } catch (Exception e) { e.printStackTrace(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java new file mode 100644 index 0000000..e0c887f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.TopicVersion; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.net.URL; + +@Test(groups = "broker") +@Slf4j +public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest { + + @BeforeMethod + @Override + public void setup() throws Exception { + resetConfig(); + super.internalSetup(); + admin.clusters().createCluster("test", + ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + TenantInfoImpl tenantInfo = new TenantInfoImpl( + Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); + admin.tenants().createTenant("pulsar", tenantInfo); + admin.namespaces().createNamespace("pulsar/system", Sets.newHashSet("test")); + admin.tenants().createTenant("public", tenantInfo); + admin.namespaces().createNamespace("public/default", Sets.newHashSet("test")); + } + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testHealthCheckup() throws Exception { + admin.brokers().healthcheck(); + } + + @Test + public void testHealthCheckupV1() throws Exception { + admin.brokers().healthcheck(TopicVersion.V1); + } + + @Test(expectedExceptions = PulsarAdminException.class) + public void testHealthCheckupV2Error() throws Exception { + admin.brokers().healthcheck(TopicVersion.V2); + } + + @Test + public void testHealthCheckupV2() throws Exception { + final URL pulsarWebAddress = new URL(pulsar.getWebServiceAddress()); + final String targetNameSpace = "pulsar/" + + pulsarWebAddress.getHost() + ":" + pulsarWebAddress.getPort(); + log.info("Target namespace for broker admin healthcheck V2 endpoint is {}", targetNameSpace); + admin.namespaces().createNamespace(targetNameSpace); + admin.brokers().healthcheck(TopicVersion.V2); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 8061b84..5d96389 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -474,7 +474,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { Map<String, NamespaceOwnershipStatus> nsMap = admin.brokers().getOwnedNamespaces("test", list.get(0)); // since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace) - Assert.assertEquals(nsMap.size(), 1); + Assert.assertEquals(nsMap.size(), 2); for (String ns : nsMap.keySet()) { NamespaceOwnershipStatus nsStatus = nsMap.get(ns); if (ns.equals( @@ -490,7 +490,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(parts.length, 2); Map<String, NamespaceOwnershipStatus> nsMap2 = adminTls.brokers().getOwnedNamespaces("test", String.format("%s:%d", parts[0], pulsar.getListenPortHTTPS().get())); - Assert.assertEquals(nsMap2.size(), 1); + Assert.assertEquals(nsMap2.size(), 2); admin.namespaces().deleteNamespace("prop-xyz/ns1"); admin.clusters().deleteCluster("test"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 58ee083..3f67eb6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -410,7 +410,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { Map<String, NamespaceOwnershipStatus> nsMap = admin.brokers().getOwnedNamespaces("use", list.get(0)); // since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace) - Assert.assertEquals(nsMap.size(), 1); + Assert.assertEquals(nsMap.size(), 2); for (String ns : nsMap.keySet()) { NamespaceOwnershipStatus nsStatus = nsMap.get(ns); if (ns.equals( @@ -426,7 +426,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(parts.length, 2); Map<String, NamespaceOwnershipStatus> nsMap2 = adminTls.brokers().getOwnedNamespaces("use", String.format("%s:%d", parts[0], pulsar.getListenPortHTTPS().get())); - Assert.assertEquals(nsMap2.size(), 1); + Assert.assertEquals(nsMap2.size(), 2); admin.namespaces().deleteNamespace("prop-xyz/use/ns1"); admin.clusters().deleteCluster("use"); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java index c444b2b..100988c 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.common.conf.InternalConfigurationData; +import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.BrokerInfo; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; @@ -268,14 +269,28 @@ public interface Brokers { * * @throws PulsarAdminException if the healthcheck fails. */ + @Deprecated void healthcheck() throws PulsarAdminException; /** * Run a healthcheck on the broker asynchronously. */ + @Deprecated CompletableFuture<Void> healthcheckAsync(); /** + * Run a healthcheck on the broker. + * + * @throws PulsarAdminException if the healthcheck fails. + */ + void healthcheck(TopicVersion topicVersion) throws PulsarAdminException; + + /** + * Run a healthcheck on the broker asynchronously. + */ + CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion); + + /** * Get version of broker. * @return version of broker. */ diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/naming/TopicVersion.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/naming/TopicVersion.java new file mode 100644 index 0000000..48bed00 --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/naming/TopicVersion.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +public enum TopicVersion { + V1, + V2, +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java index 53a3703..b520523 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.admin.Brokers; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.conf.InternalConfigurationData; +import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.BrokerInfo; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.util.Codec; @@ -352,9 +353,21 @@ public class BrokersImpl extends BaseResource implements Brokers { } @Override + @Deprecated public void healthcheck() throws PulsarAdminException { + healthcheck(TopicVersion.V1); + } + + @Override + @Deprecated + public CompletableFuture<Void> healthcheckAsync() { + return healthcheckAsync(TopicVersion.V1); + } + + @Override + public void healthcheck(TopicVersion topicVersion) throws PulsarAdminException { try { - healthcheckAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + healthcheckAsync(topicVersion).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw (PulsarAdminException) e.getCause(); } catch (InterruptedException e) { @@ -366,8 +379,11 @@ public class BrokersImpl extends BaseResource implements Brokers { } @Override - public CompletableFuture<Void> healthcheckAsync() { + public CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion) { WebTarget path = adminBrokers.path("health"); + if (topicVersion != null) { + path = path.queryParam("topicVersion", topicVersion); + } final CompletableFuture<Void> future = new CompletableFuture<>(); asyncGetRequest(path, new InvocationCallback<String>() { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 47e075f..11f5156 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -74,11 +74,9 @@ import org.apache.pulsar.common.policies.data.BookiesClusterInfo; import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.FailureDomain; -import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo; @@ -136,7 +134,7 @@ public class PulsarAdminToolTest { verify(mockBrokers).getRuntimeConfigurations(); brokers.run(split("healthcheck")); - verify(mockBrokers).healthcheck(); + verify(mockBrokers).healthcheck(null); brokers.run(split("version")); verify(mockBrokers).getVersion(); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java index 1fe1c29..a9a0dbe 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java @@ -22,6 +22,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; +import org.apache.pulsar.common.naming.TopicVersion; import java.util.function.Supplier; @@ -127,9 +128,12 @@ public class CmdBrokers extends CmdBase { @Parameters(commandDescription = "Run a health check against the broker") private class HealthcheckCmd extends CliCommand { + @Parameter(names = "--topic-version", description = "topic version V1 is default") + private TopicVersion topicVersion; + @Override void run() throws Exception { - getAdmin().brokers().healthcheck(); + getAdmin().brokers().healthcheck(topicVersion); System.out.println("ok"); }
