This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fb60ee  Add healthcheck HTTP endpoint (#2883)
4fb60ee is described below

commit 4fb60ee36972f46c570864f1ecd01a373fcf7c7b
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue Nov 6 07:58:19 2018 +0100

    Add healthcheck HTTP endpoint (#2883)
    
    * Add healthcheck HTTP endpoint
    
    Triggering the endpoint causes a client to try to produce and consume
    messages on the local broker.
    
    * Async
    
    * Add Suite to suite list (this is super cumbersome and will result in 
integration tests not being run)
    
    also, disable the one test that depends on admin client timeouts
    
    * typo
    
    * Move timeout to outside read loop
---
 .../pulsar/broker/admin/impl/BrokersBase.java      | 101 ++++++++++++++++++
 .../org/apache/pulsar/client/admin/Brokers.java    |   7 ++
 .../pulsar/client/admin/internal/BrokersImpl.java  |  11 ++
 .../org/apache/pulsar/admin/cli/CmdBrokers.java    |  14 ++-
 .../tests/integration/cli/HealthcheckTest.java     | 115 +++++++++++++++++++++
 .../integration/topologies/PulsarCluster.java      |  24 +++++
 .../src/test/resources/pulsar-process.xml          |   3 +-
 7 files changed, 273 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 5925108..c4d504c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -23,11 +23,18 @@ import static 
org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONF
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.Response.Status;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -35,8 +42,15 @@ import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -197,4 +211,91 @@ public class BrokersBase extends AdminResource {
             pulsar().getWorkerConfig().map(wc -> 
wc.getStateStorageServiceUrl()).orElse(null));
     }
 
+    @GET
+    @Path("/health")
+    @ApiOperation(value = "Run a healthcheck against the broker")
+    @ApiResponses(value = { @ApiResponse(code = 200, message = "Everything is 
OK"),
+                            @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Cluster 
doesn't exist") })
+    public void healthcheck(@Suspended AsyncResponse asyncResponse) throws 
Exception {
+        validateSuperUserAccess();
+        String heartbeatNamespace = NamespaceService.getHeartbeatNamespace(
+                pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
+        String topic = String.format("persistent://%s/healthcheck", 
heartbeatNamespace);
+
+        PulsarClient client = pulsar().getClient();
+
+        String messageStr = UUID.randomUUID().toString();
+        CompletableFuture<Producer<String>> producerFuture =
+            client.newProducer(Schema.STRING).topic(topic).createAsync();
+        CompletableFuture<Reader<String>> readerFuture = 
client.newReader(Schema.STRING)
+            .topic(topic).startMessageId(MessageId.latest).createAsync();
+
+        CompletableFuture<Void> completePromise = new CompletableFuture<>();
+
+        CompletableFuture.allOf(producerFuture, readerFuture).whenComplete(
+                (ignore, exception) -> {
+                    if (exception != null) {
+                        completePromise.completeExceptionally(exception);
+                    } else {
+                        producerFuture.thenCompose((producer) -> 
producer.sendAsync(messageStr))
+                            .whenComplete((ignore2, exception2) -> {
+                                    if (exception2 != null) {
+                                        
completePromise.completeExceptionally(exception2);
+                                    }
+                                });
+
+                        healthcheckReadLoop(readerFuture, completePromise, 
messageStr);
+
+                        // timeout read loop after 10 seconds
+                        ScheduledFuture<?> timeout = 
pulsar().getExecutor().schedule(() -> {
+                                completePromise.completeExceptionally(new 
TimeoutException("Timed out reading"));
+                            }, 10, TimeUnit.SECONDS);
+                        // don't leave timeout dangling
+                        completePromise.whenComplete((ignore2, exception2) -> {
+                                timeout.cancel(false);
+                            });
+                    }
+                });
+
+        completePromise.whenComplete((ignore, exception) -> {
+                producerFuture.thenAccept((producer) -> {
+                        producer.closeAsync().whenComplete((ignore2, 
exception2) -> {
+                                if (exception2 != null) {
+                                    LOG.warn("Error closing producer for 
healthcheck", exception2);
+                                }
+                            });
+                    });
+                readerFuture.thenAccept((reader) -> {
+                        reader.closeAsync().whenComplete((ignore2, exception2) 
-> {
+                                if (exception2 != null) {
+                                    LOG.warn("Error closing reader for 
healthcheck", exception2);
+                                }
+                            });
+                    });
+                if (exception != null) {
+                    asyncResponse.resume(new RestException(exception));
+                } else {
+                    asyncResponse.resume("ok");
+                }
+            });
+    }
+
+    private void healthcheckReadLoop(CompletableFuture<Reader<String>> 
readerFuture,
+                                     CompletableFuture<?> completablePromise,
+                                     String messageStr) {
+        readerFuture.thenAccept((reader) -> {
+                CompletableFuture<Message<String>> readFuture = 
reader.readNextAsync()
+                    .whenComplete((m, exception) -> {
+                            if (exception != null) {
+                                
completablePromise.completeExceptionally(exception);
+                            } else if (m.getValue().equals(messageStr)) {
+                                completablePromise.complete(null);
+                            } else {
+                                healthcheckReadLoop(readerFuture, 
completablePromise, messageStr);
+                            }
+                        });
+            });
+    }
 }
+
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java
index a6fadd1..50503c1 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java
@@ -105,4 +105,11 @@ public interface Brokers {
      * @return internal configuration data.
      */
     InternalConfigurationData getInternalConfigurationData() throws 
PulsarAdminException;
+
+    /**
+     * Run a healthcheck on the broker.
+     *
+     * @throws an exception if the healthcheck fails.
+     */
+    void healthcheck() throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index 52270d3..969f673 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -101,4 +101,15 @@ public class BrokersImpl extends BaseResource implements 
Brokers {
         }
     }
 
+    @Override
+    public void healthcheck() throws PulsarAdminException {
+        try {
+            String result = 
request(adminBrokers.path("/health")).get(String.class);
+            if (!result.trim().toLowerCase().equals("ok")) {
+                throw new PulsarAdminException("Healthcheck returned 
unexpected result: " + result);
+            }
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
index 9dc50a0..9a93474 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
@@ -92,7 +92,18 @@ public class CmdBrokers extends CmdBase {
         }
 
     }
-    
+
+    @Parameters(commandDescription = "Run a health check against the broker")
+    private class HealthcheckCmd extends CliCommand {
+
+        @Override
+        void run() throws Exception {
+            admin.brokers().healthcheck();
+            System.out.println("ok");
+        }
+
+    }
+
     public CmdBrokers(PulsarAdmin admin) {
         super("brokers", admin);
         jcommander.addCommand("list", new List());
@@ -101,5 +112,6 @@ public class CmdBrokers extends CmdBase {
         jcommander.addCommand("list-dynamic-config", new 
GetUpdatableConfigCmd());
         jcommander.addCommand("get-all-dynamic-config", new 
GetAllConfigurationsCmd());
         jcommander.addCommand("get-internal-config", new 
GetInternalConfigurationCmd());
+        jcommander.addCommand("healthcheck", new HealthcheckCmd());
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthcheckTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthcheckTest.java
new file mode 100644
index 0000000..7c74123
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/HealthcheckTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.cli;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.pulsar.tests.integration.containers.BKContainer;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test the healthcheck command.
+ */
+public class HealthcheckTest {
+    private final static Logger log = 
LoggerFactory.getLogger(HealthcheckTest.class);
+    private final PulsarClusterSpec spec = PulsarClusterSpec.builder()
+        .clusterName("HealthcheckTest-" + 
UUID.randomUUID().toString().substring(0, 8))
+        .numProxies(0).numFunctionWorkers(0).enablePrestoWorker(false).build();
+    private PulsarCluster pulsarCluster = null;
+
+    @BeforeMethod
+    public void setupCluster() throws Exception {
+        pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+    }
+
+    @AfterMethod
+    public void tearDownCluster() throws Exception {
+        if (pulsarCluster != null) {
+            pulsarCluster.stop();
+            pulsarCluster = null;
+        }
+    }
+
+    @Test
+    public void testEverythingOK() throws Exception {
+        for (BrokerContainer b : pulsarCluster.getBrokers()) {
+            ContainerExecResult result = b.execCmd(PulsarCluster.ADMIN_SCRIPT, 
"brokers", "healthcheck");
+            Assert.assertEquals(result.getExitCode(), 0);
+            Assert.assertEquals(result.getStdout().trim(), "ok");
+        }
+    }
+
+    private void assertHealthcheckFailure() throws Exception {
+        for (BrokerContainer b : pulsarCluster.getBrokers()) {
+            try {
+                b.execCmd(PulsarCluster.ADMIN_SCRIPT, "brokers", 
"healthcheck");
+                Assert.fail("Should always fail");
+            } catch (ContainerExecException e) {
+                Assert.assertEquals(e.getResult().getExitCode(), 1);
+            }
+        }
+    }
+
+    @Test
+    public void testZooKeeperDown() throws Exception {
+        pulsarCluster.getZooKeeper().execCmd("pkill", "-STOP", "-f", 
"ZooKeeperStarter");
+        assertHealthcheckFailure();
+    }
+
+    // Disabled until PulsarAdmin can time out (#2891)
+    // @Test
+    // public void testBrokerDown() throws Exception {
+    //     for (BrokerContainer b : pulsarCluster.getBrokers()) {
+    //         b.execCmd("pkill", "-STOP", "-f", "PulsarBrokerStarter");
+    //     }
+    //     assertHealthcheckFailure();
+    // }
+
+    @Test
+    public void testBookKeeperDown() throws Exception {
+        for (BKContainer b : pulsarCluster.getBookies()) {
+            b.execCmd("pkill", "-STOP", "-f", "BookieServer");
+        }
+        assertHealthcheckFailure();
+    }
+
+    private static Map<String, String> parseOutput(String output) throws 
Exception {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(output, new TypeReference<HashMap<String, 
String>>() {});
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 7f9986e..bbb5bd4 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -448,6 +448,14 @@ public class PulsarCluster {
         return brokerContainers.values();
     }
 
+    public Collection<BKContainer> getBookies() {
+        return bookieContainers.values();
+    }
+
+    public ZKContainer getZooKeeper() {
+        return zkContainer;
+    }
+
     public ContainerExecResult runAdminCommandOnAnyBroker(String...commands) 
throws Exception {
         return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
     }
@@ -472,6 +480,22 @@ public class PulsarCluster {
         brokerContainers.values().forEach(BrokerContainer::start);
     }
 
+    public void stopAllBookies() {
+        bookieContainers.values().forEach(BKContainer::stop);
+    }
+
+    public void startAllBookies() {
+        bookieContainers.values().forEach(BKContainer::start);
+    }
+
+    public void stopZooKeeper() {
+        zkContainer.stop();
+    }
+
+    public void startZooKeeper() {
+        zkContainer.start();
+    }
+
     public ContainerExecResult createNamespace(String nsName) throws Exception 
{
         return runAdminCommandOnAnyBroker(
             "namespaces", "create", "public/" + nsName,
diff --git a/tests/integration/src/test/resources/pulsar-process.xml 
b/tests/integration/src/test/resources/pulsar-process.xml
index e29a05e..946c2f9 100644
--- a/tests/integration/src/test/resources/pulsar-process.xml
+++ b/tests/integration/src/test/resources/pulsar-process.xml
@@ -26,6 +26,7 @@
             <class 
name="org.apache.pulsar.tests.integration.compaction.TestCompaction" />
             <class 
name="org.apache.pulsar.tests.integration.functions.PulsarFunctionsProcessTest" 
/>
             <class 
name="org.apache.pulsar.tests.integration.presto.TestBasicPresto" />
+            <class 
name="org.apache.pulsar.tests.integration.cli.HealthcheckTest" />
         </classes>
     </test>
-</suite>
\ No newline at end of file
+</suite>

Reply via email to