This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 4fb60ee Add healthcheck HTTP endpoint (#2883) 4fb60ee is described below commit 4fb60ee36972f46c570864f1ecd01a373fcf7c7b Author: Ivan Kelly <iv...@apache.org> AuthorDate: Tue Nov 6 07:58:19 2018 +0100 Add healthcheck HTTP endpoint (#2883) * Add healthcheck HTTP endpoint Triggering the endpoint causes a client to try to produce and consume messages on the local broker. * Async * Add Suite to suite list (this is super cumbersome and will result in integration tests not being run) also, disable the one test that depends on admin client timeouts * typo * Move timeout to outside read loop --- .../pulsar/broker/admin/impl/BrokersBase.java | 101 ++++++++++++++++++ .../org/apache/pulsar/client/admin/Brokers.java | 7 ++ .../pulsar/client/admin/internal/BrokersImpl.java | 11 ++ .../org/apache/pulsar/admin/cli/CmdBrokers.java | 14 ++- .../tests/integration/cli/HealthcheckTest.java | 115 +++++++++++++++++++++ .../integration/topologies/PulsarCluster.java | 24 +++++ .../src/test/resources/pulsar-process.xml | 3 +- 7 files changed, 273 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 5925108..c4d504c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -23,11 +23,18 @@ import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONF import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; import javax.ws.rs.core.Response.Status; import org.apache.bookkeeper.conf.ClientConfiguration; @@ -35,8 +42,15 @@ import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -197,4 +211,91 @@ public class BrokersBase extends AdminResource { pulsar().getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null)); } + @GET + @Path("/health") + @ApiOperation(value = "Run a healthcheck against the broker") + @ApiResponses(value = { @ApiResponse(code = 200, message = "Everything is OK"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Cluster doesn't exist") }) + public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception { + validateSuperUserAccess(); + String heartbeatNamespace = NamespaceService.getHeartbeatNamespace( + pulsar().getAdvertisedAddress(), pulsar().getConfiguration()); + String topic = String.format("persistent://%s/healthcheck", heartbeatNamespace); + + PulsarClient client = pulsar().getClient(); + + String messageStr = UUID.randomUUID().toString(); + CompletableFuture<Producer<String>> producerFuture = + client.newProducer(Schema.STRING).topic(topic).createAsync(); + CompletableFuture<Reader<String>> readerFuture = client.newReader(Schema.STRING) + .topic(topic).startMessageId(MessageId.latest).createAsync(); + + CompletableFuture<Void> completePromise = new CompletableFuture<>(); + + CompletableFuture.allOf(producerFuture, readerFuture).whenComplete( + (ignore, exception) -> { + if (exception != null) { + completePromise.completeExceptionally(exception); + } else { + producerFuture.thenCompose((producer) -> producer.sendAsync(messageStr)) + .whenComplete((ignore2, exception2) -> { + if (exception2 != null) { + completePromise.completeExceptionally(exception2); + } + }); + + healthcheckReadLoop(readerFuture, completePromise, messageStr); + + // timeout read loop after 10 seconds + ScheduledFuture<?> timeout = pulsar().getExecutor().schedule(() -> { + completePromise.completeExceptionally(new TimeoutException("Timed out reading")); + }, 10, TimeUnit.SECONDS); + // don't leave timeout dangling + completePromise.whenComplete((ignore2, exception2) -> { + timeout.cancel(false); + }); + } + }); + + completePromise.whenComplete((ignore, exception) -> { + producerFuture.thenAccept((producer) -> { + producer.closeAsync().whenComplete((ignore2, exception2) -> { + if (exception2 != null) { + LOG.warn("Error closing producer for healthcheck", exception2); + } + }); + }); + readerFuture.thenAccept((reader) -> { + reader.closeAsync().whenComplete((ignore2, exception2) -> { + if (exception2 != null) { + LOG.warn("Error closing reader for healthcheck", exception2); + } + }); + }); + if (exception != null) { + asyncResponse.resume(new RestException(exception)); + } else { + asyncResponse.resume("ok"); + } + }); + } + + private void healthcheckReadLoop(CompletableFuture<Reader<String>> readerFuture, + CompletableFuture<?> completablePromise, + String messageStr) { + readerFuture.thenAccept((reader) -> { + CompletableFuture<Message<String>> readFuture = reader.readNextAsync() + .whenComplete((m, exception) -> { + if (exception != null) { + completablePromise.completeExceptionally(exception); + } else if (m.getValue().equals(messageStr)) { + completablePromise.complete(null); + } else { + healthcheckReadLoop(readerFuture, completablePromise, messageStr); + } + }); + }); + } } + diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java index a6fadd1..50503c1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java @@ -105,4 +105,11 @@ public interface Brokers { * @return internal configuration data. */ InternalConfigurationData getInternalConfigurationData() throws PulsarAdminException; + + /** + * Run a healthcheck on the broker. + * + * @throws an exception if the healthcheck fails. + */ + void healthcheck() throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java index 52270d3..969f673 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java @@ -101,4 +101,15 @@ public class BrokersImpl extends BaseResource implements Brokers { } } + @Override + public void healthcheck() throws PulsarAdminException { + try { + String result = request(adminBrokers.path("/health")).get(String.class); + if (!result.trim().toLowerCase().equals("ok")) { + throw new PulsarAdminException("Healthcheck returned unexpected result: " + result); + } + } catch (Exception e) { + throw getApiException(e); + } + } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java index 9dc50a0..9a93474 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java @@ -92,7 +92,18 @@ public class CmdBrokers extends CmdBase { } } - + + @Parameters(commandDescription = "Run a health check against the broker") + private class HealthcheckCmd extends CliCommand { + + @Override + void run() throws Exception { + admin.brokers().healthcheck(); + System.out.println("ok"); + } + + } + public CmdBrokers(PulsarAdmin admin) { super("brokers", admin); jcommander.addCommand("list", new List()); @@ -101,5 +112,6 @@ public class CmdBrokers extends CmdBase { jcommander.addCommand("list-dynamic-config", new GetUpdatableConfigCmd()); jcommander.addCommand("get-all-dynamic-config", new GetAllConfigurationsCmd()); jcommander.addCommand("get-internal-config", new GetInternalConfigurationCmd()); + jcommander.addCommand("healthcheck", new HealthcheckCmd()); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthcheckTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthcheckTest.java new file mode 100644 index 0000000..7c74123 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthcheckTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.cli; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.pulsar.tests.integration.containers.BKContainer; +import org.apache.pulsar.tests.integration.containers.BrokerContainer; +import org.apache.pulsar.tests.integration.docker.ContainerExecException; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; + +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Test the healthcheck command. + */ +public class HealthcheckTest { + private final static Logger log = LoggerFactory.getLogger(HealthcheckTest.class); + private final PulsarClusterSpec spec = PulsarClusterSpec.builder() + .clusterName("HealthcheckTest-" + UUID.randomUUID().toString().substring(0, 8)) + .numProxies(0).numFunctionWorkers(0).enablePrestoWorker(false).build(); + private PulsarCluster pulsarCluster = null; + + @BeforeMethod + public void setupCluster() throws Exception { + pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + } + + @AfterMethod + public void tearDownCluster() throws Exception { + if (pulsarCluster != null) { + pulsarCluster.stop(); + pulsarCluster = null; + } + } + + @Test + public void testEverythingOK() throws Exception { + for (BrokerContainer b : pulsarCluster.getBrokers()) { + ContainerExecResult result = b.execCmd(PulsarCluster.ADMIN_SCRIPT, "brokers", "healthcheck"); + Assert.assertEquals(result.getExitCode(), 0); + Assert.assertEquals(result.getStdout().trim(), "ok"); + } + } + + private void assertHealthcheckFailure() throws Exception { + for (BrokerContainer b : pulsarCluster.getBrokers()) { + try { + b.execCmd(PulsarCluster.ADMIN_SCRIPT, "brokers", "healthcheck"); + Assert.fail("Should always fail"); + } catch (ContainerExecException e) { + Assert.assertEquals(e.getResult().getExitCode(), 1); + } + } + } + + @Test + public void testZooKeeperDown() throws Exception { + pulsarCluster.getZooKeeper().execCmd("pkill", "-STOP", "-f", "ZooKeeperStarter"); + assertHealthcheckFailure(); + } + + // Disabled until PulsarAdmin can time out (#2891) + // @Test + // public void testBrokerDown() throws Exception { + // for (BrokerContainer b : pulsarCluster.getBrokers()) { + // b.execCmd("pkill", "-STOP", "-f", "PulsarBrokerStarter"); + // } + // assertHealthcheckFailure(); + // } + + @Test + public void testBookKeeperDown() throws Exception { + for (BKContainer b : pulsarCluster.getBookies()) { + b.execCmd("pkill", "-STOP", "-f", "BookieServer"); + } + assertHealthcheckFailure(); + } + + private static Map<String, String> parseOutput(String output) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(output, new TypeReference<HashMap<String, String>>() {}); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 7f9986e..bbb5bd4 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -448,6 +448,14 @@ public class PulsarCluster { return brokerContainers.values(); } + public Collection<BKContainer> getBookies() { + return bookieContainers.values(); + } + + public ZKContainer getZooKeeper() { + return zkContainer; + } + public ContainerExecResult runAdminCommandOnAnyBroker(String...commands) throws Exception { return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands); } @@ -472,6 +480,22 @@ public class PulsarCluster { brokerContainers.values().forEach(BrokerContainer::start); } + public void stopAllBookies() { + bookieContainers.values().forEach(BKContainer::stop); + } + + public void startAllBookies() { + bookieContainers.values().forEach(BKContainer::start); + } + + public void stopZooKeeper() { + zkContainer.stop(); + } + + public void startZooKeeper() { + zkContainer.start(); + } + public ContainerExecResult createNamespace(String nsName) throws Exception { return runAdminCommandOnAnyBroker( "namespaces", "create", "public/" + nsName, diff --git a/tests/integration/src/test/resources/pulsar-process.xml b/tests/integration/src/test/resources/pulsar-process.xml index e29a05e..946c2f9 100644 --- a/tests/integration/src/test/resources/pulsar-process.xml +++ b/tests/integration/src/test/resources/pulsar-process.xml @@ -26,6 +26,7 @@ <class name="org.apache.pulsar.tests.integration.compaction.TestCompaction" /> <class name="org.apache.pulsar.tests.integration.functions.PulsarFunctionsProcessTest" /> <class name="org.apache.pulsar.tests.integration.presto.TestBasicPresto" /> + <class name="org.apache.pulsar.tests.integration.cli.HealthcheckTest" /> </classes> </test> -</suite> \ No newline at end of file +</suite>