This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4fb60ee Add healthcheck HTTP endpoint (#2883)
4fb60ee is described below
commit 4fb60ee36972f46c570864f1ecd01a373fcf7c7b
Author: Ivan Kelly <[email protected]>
AuthorDate: Tue Nov 6 07:58:19 2018 +0100
Add healthcheck HTTP endpoint (#2883)
* Add healthcheck HTTP endpoint
Triggering the endpoint causes a client to try to produce and consume
messages on the local broker.
* Async
* Add Suite to suite list (this is super cumbersome and will result in
integration tests not being run)
also, disable the one test that depends on admin client timeouts
* typo
* Move timeout to outside read loop
---
.../pulsar/broker/admin/impl/BrokersBase.java | 101 ++++++++++++++++++
.../org/apache/pulsar/client/admin/Brokers.java | 7 ++
.../pulsar/client/admin/internal/BrokersImpl.java | 11 ++
.../org/apache/pulsar/admin/cli/CmdBrokers.java | 14 ++-
.../tests/integration/cli/HealthcheckTest.java | 115 +++++++++++++++++++++
.../integration/topologies/PulsarCluster.java | 24 +++++
.../src/test/resources/pulsar-process.xml | 3 +-
7 files changed, 273 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 5925108..c4d504c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -23,11 +23,18 @@ import static
org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONF
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response.Status;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -35,8 +42,15 @@ import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -197,4 +211,91 @@ public class BrokersBase extends AdminResource {
pulsar().getWorkerConfig().map(wc ->
wc.getStateStorageServiceUrl()).orElse(null));
}
+ @GET
+ @Path("/health")
+ @ApiOperation(value = "Run a healthcheck against the broker")
+ @ApiResponses(value = { @ApiResponse(code = 200, message = "Everything is
OK"),
+ @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Cluster
doesn't exist") })
+ public void healthcheck(@Suspended AsyncResponse asyncResponse) throws
Exception {
+ validateSuperUserAccess();
+ String heartbeatNamespace = NamespaceService.getHeartbeatNamespace(
+ pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
+ String topic = String.format("persistent://%s/healthcheck",
heartbeatNamespace);
+
+ PulsarClient client = pulsar().getClient();
+
+ String messageStr = UUID.randomUUID().toString();
+ CompletableFuture<Producer<String>> producerFuture =
+ client.newProducer(Schema.STRING).topic(topic).createAsync();
+ CompletableFuture<Reader<String>> readerFuture =
client.newReader(Schema.STRING)
+ .topic(topic).startMessageId(MessageId.latest).createAsync();
+
+ CompletableFuture<Void> completePromise = new CompletableFuture<>();
+
+ CompletableFuture.allOf(producerFuture, readerFuture).whenComplete(
+ (ignore, exception) -> {
+ if (exception != null) {
+ completePromise.completeExceptionally(exception);
+ } else {
+ producerFuture.thenCompose((producer) ->
producer.sendAsync(messageStr))
+ .whenComplete((ignore2, exception2) -> {
+ if (exception2 != null) {
+
completePromise.completeExceptionally(exception2);
+ }
+ });
+
+ healthcheckReadLoop(readerFuture, completePromise,
messageStr);
+
+ // timeout read loop after 10 seconds
+ ScheduledFuture<?> timeout =
pulsar().getExecutor().schedule(() -> {
+ completePromise.completeExceptionally(new
TimeoutException("Timed out reading"));
+ }, 10, TimeUnit.SECONDS);
+ // don't leave timeout dangling
+ completePromise.whenComplete((ignore2, exception2) -> {
+ timeout.cancel(false);
+ });
+ }
+ });
+
+ completePromise.whenComplete((ignore, exception) -> {
+ producerFuture.thenAccept((producer) -> {
+ producer.closeAsync().whenComplete((ignore2,
exception2) -> {
+ if (exception2 != null) {
+ LOG.warn("Error closing producer for
healthcheck", exception2);
+ }
+ });
+ });
+ readerFuture.thenAccept((reader) -> {
+ reader.closeAsync().whenComplete((ignore2, exception2)
-> {
+ if (exception2 != null) {
+ LOG.warn("Error closing reader for
healthcheck", exception2);
+ }
+ });
+ });
+ if (exception != null) {
+ asyncResponse.resume(new RestException(exception));
+ } else {
+ asyncResponse.resume("ok");
+ }
+ });
+ }
+
+ private void healthcheckReadLoop(CompletableFuture<Reader<String>>
readerFuture,
+ CompletableFuture<?> completablePromise,
+ String messageStr) {
+ readerFuture.thenAccept((reader) -> {
+ CompletableFuture<Message<String>> readFuture =
reader.readNextAsync()
+ .whenComplete((m, exception) -> {
+ if (exception != null) {
+
completablePromise.completeExceptionally(exception);
+ } else if (m.getValue().equals(messageStr)) {
+ completablePromise.complete(null);
+ } else {
+ healthcheckReadLoop(readerFuture,
completablePromise, messageStr);
+ }
+ });
+ });
+ }
}
+
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java
index a6fadd1..50503c1 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java
@@ -105,4 +105,11 @@ public interface Brokers {
* @return internal configuration data.
*/
InternalConfigurationData getInternalConfigurationData() throws
PulsarAdminException;
+
+ /**
+ * Run a healthcheck on the broker.
+ *
+ * @throws an exception if the healthcheck fails.
+ */
+ void healthcheck() throws PulsarAdminException;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index 52270d3..969f673 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -101,4 +101,15 @@ public class BrokersImpl extends BaseResource implements
Brokers {
}
}
+ @Override
+ public void healthcheck() throws PulsarAdminException {
+ try {
+ String result =
request(adminBrokers.path("/health")).get(String.class);
+ if (!result.trim().toLowerCase().equals("ok")) {
+ throw new PulsarAdminException("Healthcheck returned
unexpected result: " + result);
+ }
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
index 9dc50a0..9a93474 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
@@ -92,7 +92,18 @@ public class CmdBrokers extends CmdBase {
}
}
-
+
+ @Parameters(commandDescription = "Run a health check against the broker")
+ private class HealthcheckCmd extends CliCommand {
+
+ @Override
+ void run() throws Exception {
+ admin.brokers().healthcheck();
+ System.out.println("ok");
+ }
+
+ }
+
public CmdBrokers(PulsarAdmin admin) {
super("brokers", admin);
jcommander.addCommand("list", new List());
@@ -101,5 +112,6 @@ public class CmdBrokers extends CmdBase {
jcommander.addCommand("list-dynamic-config", new
GetUpdatableConfigCmd());
jcommander.addCommand("get-all-dynamic-config", new
GetAllConfigurationsCmd());
jcommander.addCommand("get-internal-config", new
GetInternalConfigurationCmd());
+ jcommander.addCommand("healthcheck", new HealthcheckCmd());
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthcheckTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthcheckTest.java
new file mode 100644
index 0000000..7c74123
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthcheckTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.cli;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.pulsar.tests.integration.containers.BKContainer;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test the healthcheck command.
+ */
+public class HealthcheckTest {
+ private final static Logger log =
LoggerFactory.getLogger(HealthcheckTest.class);
+ private final PulsarClusterSpec spec = PulsarClusterSpec.builder()
+ .clusterName("HealthcheckTest-" +
UUID.randomUUID().toString().substring(0, 8))
+ .numProxies(0).numFunctionWorkers(0).enablePrestoWorker(false).build();
+ private PulsarCluster pulsarCluster = null;
+
+ @BeforeMethod
+ public void setupCluster() throws Exception {
+ pulsarCluster = PulsarCluster.forSpec(spec);
+ pulsarCluster.start();
+ }
+
+ @AfterMethod
+ public void tearDownCluster() throws Exception {
+ if (pulsarCluster != null) {
+ pulsarCluster.stop();
+ pulsarCluster = null;
+ }
+ }
+
+ @Test
+ public void testEverythingOK() throws Exception {
+ for (BrokerContainer b : pulsarCluster.getBrokers()) {
+ ContainerExecResult result = b.execCmd(PulsarCluster.ADMIN_SCRIPT,
"brokers", "healthcheck");
+ Assert.assertEquals(result.getExitCode(), 0);
+ Assert.assertEquals(result.getStdout().trim(), "ok");
+ }
+ }
+
+ private void assertHealthcheckFailure() throws Exception {
+ for (BrokerContainer b : pulsarCluster.getBrokers()) {
+ try {
+ b.execCmd(PulsarCluster.ADMIN_SCRIPT, "brokers",
"healthcheck");
+ Assert.fail("Should always fail");
+ } catch (ContainerExecException e) {
+ Assert.assertEquals(e.getResult().getExitCode(), 1);
+ }
+ }
+ }
+
+ @Test
+ public void testZooKeeperDown() throws Exception {
+ pulsarCluster.getZooKeeper().execCmd("pkill", "-STOP", "-f",
"ZooKeeperStarter");
+ assertHealthcheckFailure();
+ }
+
+ // Disabled until PulsarAdmin can time out (#2891)
+ // @Test
+ // public void testBrokerDown() throws Exception {
+ // for (BrokerContainer b : pulsarCluster.getBrokers()) {
+ // b.execCmd("pkill", "-STOP", "-f", "PulsarBrokerStarter");
+ // }
+ // assertHealthcheckFailure();
+ // }
+
+ @Test
+ public void testBookKeeperDown() throws Exception {
+ for (BKContainer b : pulsarCluster.getBookies()) {
+ b.execCmd("pkill", "-STOP", "-f", "BookieServer");
+ }
+ assertHealthcheckFailure();
+ }
+
+ private static Map<String, String> parseOutput(String output) throws
Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(output, new TypeReference<HashMap<String,
String>>() {});
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 7f9986e..bbb5bd4 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -448,6 +448,14 @@ public class PulsarCluster {
return brokerContainers.values();
}
+ public Collection<BKContainer> getBookies() {
+ return bookieContainers.values();
+ }
+
+ public ZKContainer getZooKeeper() {
+ return zkContainer;
+ }
+
public ContainerExecResult runAdminCommandOnAnyBroker(String...commands)
throws Exception {
return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
}
@@ -472,6 +480,22 @@ public class PulsarCluster {
brokerContainers.values().forEach(BrokerContainer::start);
}
+ public void stopAllBookies() {
+ bookieContainers.values().forEach(BKContainer::stop);
+ }
+
+ public void startAllBookies() {
+ bookieContainers.values().forEach(BKContainer::start);
+ }
+
+ public void stopZooKeeper() {
+ zkContainer.stop();
+ }
+
+ public void startZooKeeper() {
+ zkContainer.start();
+ }
+
public ContainerExecResult createNamespace(String nsName) throws Exception
{
return runAdminCommandOnAnyBroker(
"namespaces", "create", "public/" + nsName,
diff --git a/tests/integration/src/test/resources/pulsar-process.xml
b/tests/integration/src/test/resources/pulsar-process.xml
index e29a05e..946c2f9 100644
--- a/tests/integration/src/test/resources/pulsar-process.xml
+++ b/tests/integration/src/test/resources/pulsar-process.xml
@@ -26,6 +26,7 @@
<class
name="org.apache.pulsar.tests.integration.compaction.TestCompaction" />
<class
name="org.apache.pulsar.tests.integration.functions.PulsarFunctionsProcessTest"
/>
<class
name="org.apache.pulsar.tests.integration.presto.TestBasicPresto" />
+ <class
name="org.apache.pulsar.tests.integration.cli.HealthcheckTest" />
</classes>
</test>
-</suite>
\ No newline at end of file
+</suite>