This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 44e0663  Utilities for arquillian integration tests (#1310)
44e0663 is described below

commit 44e06635c1524229a923e8fbb525df278fcecdec
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Sat Mar 3 02:17:32 2018 +0100

    Utilities for arquillian integration tests (#1310)
    
    * Utilities for arquillian integration tests
    
    A set of utilities for working with clusters set up in arquillian.
    
    - Utilities to work with docker
    - Utilities to work with arquillian pulsar clusters
    - Await strategies that arquillian uses to know if a node is up
    - Stop actions to copy logs at the end of a test suite
    
    * Move to log4j2
    
    * Pull in extra dependencies for log4j yml configuration
---
 tests/integration-tests-utils/pom.xml              |  84 +++++++
 .../java/org/apache/pulsar/tests/DockerUtils.java  | 220 +++++++++++++++++
 .../pulsar/tests/LogToTargetDirStopAction.java     |  41 ++++
 .../org/apache/pulsar/tests/NoopAwaitStrategy.java |  28 +++
 .../apache/pulsar/tests/PulsarClusterUtils.java    | 269 +++++++++++++++++++++
 .../tests/PulsarLogsToTargetDirStopAction.java     |  43 ++++
 .../pulsar/tests/ZooKeeperAwaitStrategy.java       |  54 +++++
 .../src/main/resources/log4j2.yml                  |  41 ++++
 tests/pom.xml                                      |   1 +
 9 files changed, 781 insertions(+)

diff --git a/tests/integration-tests-utils/pom.xml 
b/tests/integration-tests-utils/pom.xml
new file mode 100644
index 0000000..c6d74c8
--- /dev/null
+++ b/tests/integration-tests-utils/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="
+  http://maven.apache.org/POM/4.0.0
+  http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar.tests</groupId>
+    <artifactId>tests-parent</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.pulsar.tests</groupId>
+  <artifactId>integration-tests-utils</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Pulsar :: Tests :: Utility module for Arquillian based 
integration tests</name>
+
+  <properties>
+    <arquillian-cube.version>1.15.1</arquillian-cube.version>
+    <commons-compress.version>1.15</commons-compress.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+      <version>${commons-compress.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.arquillian.cube</groupId>
+      <artifactId>arquillian-cube-docker</artifactId>
+      <version>${arquillian-cube.version}</version>
+    </dependency>
+
+  </dependencies>
+</project>
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java
new file mode 100644
index 0000000..fb69dc7
--- /dev/null
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.async.ResultCallback;
+import com.github.dockerjava.api.command.InspectExecResponse;
+import com.github.dockerjava.api.model.Frame;
+import com.github.dockerjava.api.model.ContainerNetwork;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DockerUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DockerUtils.class);
+
+    private static File getTargetDirectory(String containerId) {
+        String base = System.getProperty("maven.buildDirectory");
+        if (base == null) {
+            base = "target";
+        }
+        File directory = new File(base + "/container-logs/" + containerId);
+        if (!directory.exists() && !directory.mkdirs()) {
+            LOG.error("Error creating directory for container logs.");
+        }
+        return directory;
+    }
+
+    public static void dumpContainerLogToTarget(DockerClient docker, String 
containerId) {
+        File output = new File(getTargetDirectory(containerId), "docker.log");
+        try (FileOutputStream os = new FileOutputStream(output)) {
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            docker.logContainerCmd(containerId).withStdOut(true)
+                .withStdErr(true).withTimestamps(true).exec(new 
ResultCallback<Frame>() {
+                        @Override
+                        public void close() {}
+
+                        @Override
+                        public void onStart(Closeable closeable) {}
+
+                        @Override
+                        public void onNext(Frame object) {
+                            try {
+                                os.write(object.getPayload());
+                            } catch (IOException e) {
+                                onError(e);
+                            }
+                        }
+
+                        @Override
+                        public void onError(Throwable throwable) {
+                            future.completeExceptionally(throwable);
+                        }
+
+                        @Override
+                        public void onComplete() {
+                            future.complete(true);
+                        }
+                    });
+            future.get();
+        } catch (RuntimeException|ExecutionException|IOException e) {
+            LOG.error("Error dumping log for {}", containerId, e);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            LOG.info("Interrupted dumping log from container {}", containerId, 
ie);
+        }
+    }
+
+    public static void dumpContainerLogDirToTarget(DockerClient docker, String 
containerId, String path) {
+        final int READ_BLOCK_SIZE = 10000;
+
+        try (InputStream dockerStream = 
docker.copyArchiveFromContainerCmd(containerId, path).exec();
+             TarArchiveInputStream stream = new 
TarArchiveInputStream(dockerStream)) {
+            TarArchiveEntry entry = stream.getNextTarEntry();
+            while (entry != null) {
+                if (entry.isFile()) {
+                    File output = new File(getTargetDirectory(containerId), 
entry.getName().replace("/", "-"));
+                    try (FileOutputStream os = new FileOutputStream(output)) {
+                        byte[] block = new byte[READ_BLOCK_SIZE];
+                        int read = stream.read(block, 0, READ_BLOCK_SIZE);
+                        while (read > -1) {
+                            os.write(block, 0, read);
+                            read = stream.read(block, 0, READ_BLOCK_SIZE);
+                        }
+                    }
+                }
+                entry = stream.getNextTarEntry();
+            }
+        } catch (RuntimeException|IOException e) {
+            LOG.error("Error reading logs from container {}", containerId, e);
+        }
+    }
+
+    public static String getContainerIP(DockerClient docker, String 
containerId) {
+        for (Map.Entry<String, ContainerNetwork> e : 
docker.inspectContainerCmd(containerId)
+                 .exec().getNetworkSettings().getNetworks().entrySet()) {
+            return e.getValue().getIpAddress();
+        }
+        throw new IllegalArgumentException("Container " + containerId + " has 
no networks");
+    }
+
+    public static String getContainerHostname(DockerClient docker, String 
containerId) {
+        return runCommand(docker, containerId, "hostname").trim();
+    }
+
+    public static String runCommand(DockerClient docker, String containerId, 
String... cmd) {
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        String execid = docker.execCreateCmd(containerId).withCmd(cmd)
+            .withAttachStderr(true).withAttachStdout(true).exec().getId();
+        String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" "));
+        StringBuffer output = new StringBuffer();
+        docker.execStartCmd(execid).withDetach(false)
+            .exec(new ResultCallback<Frame>() {
+                @Override
+                public void close() {}
+
+                @Override
+                public void onStart(Closeable closeable) {
+                    LOG.info("DOCKER.exec({}:{}): Executing...", containerId, 
cmdString);
+                }
+
+                @Override
+                public void onNext(Frame object) {
+                    LOG.info("DOCKER.exec({}:{}): {}", containerId, cmdString, 
object);
+                    output.append(new String(object.getPayload()));
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    future.completeExceptionally(throwable);
+                }
+
+                @Override
+                public void onComplete() {
+                    LOG.info("DOCKER.exec({}:{}): Done", containerId, 
cmdString);
+                    future.complete(true);
+                }
+            });
+        future.join();
+
+        InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
+        while (resp.isRunning()) {
+            try {
+                Thread.sleep(200);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(ie);
+            }
+            resp = docker.inspectExecCmd(execid).exec();
+        }
+        int retCode = resp.getExitCode();
+        if (retCode != 0) {
+            throw new RuntimeException(
+                    String.format("cmd(%s) failed on %s with exitcode %d",
+                                  cmdString, containerId, retCode));
+        } else {
+            LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, 
cmdString, retCode);
+        }
+        return output.toString();
+    }
+
+    public static Optional<String> getContainerCluster(DockerClient docker, 
String containerId) {
+        return Optional.ofNullable(docker.inspectContainerCmd(containerId)
+                                   
.exec().getConfig().getLabels().get("cluster"));
+    }
+
+    public static Set<String> allCubeIds() {
+        Pattern pattern = Pattern.compile("^arq.cube.docker.([^.]*).ip$");
+        return System.getProperties().keySet().stream()
+            .map(k -> pattern.matcher(k.toString()))
+            .filter(m -> m.matches())
+            .map(m -> m.group(1))
+            .collect(Collectors.toSet());
+    }
+
+    public static Set<String> cubeIdsWithLabels(DockerClient docker, 
Map<String,String> labels) {
+        return allCubeIds().stream()
+            .filter(id -> {
+                    Map<String,String> configuredLabels = 
docker.inspectContainerCmd(id).exec().getConfig().getLabels();
+                    return labels.entrySet().stream()
+                        .map(e -> configuredLabels.containsKey(e.getKey())
+                             && 
configuredLabels.get(e.getKey()).equals(e.getValue()))
+                        .reduce(true, (acc, res) -> acc && res);
+                })
+            .collect(Collectors.toSet());
+    }
+}
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/LogToTargetDirStopAction.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/LogToTargetDirStopAction.java
new file mode 100644
index 0000000..f74ce47
--- /dev/null
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/LogToTargetDirStopAction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import org.arquillian.cube.docker.impl.docker.DockerClientExecutor;
+import org.arquillian.cube.impl.model.CubeId;
+import org.arquillian.cube.spi.beforeStop.BeforeStopAction;
+
+public class LogToTargetDirStopAction implements BeforeStopAction {
+    private DockerClientExecutor dockerClientExecutor;
+    private CubeId containerID;
+
+    public void setDockerClientExecutor(DockerClientExecutor executor) {
+        this.dockerClientExecutor = executor;
+    }
+
+    public void setContainerID(CubeId containerID) {
+        this.containerID = containerID;
+    }
+
+    @Override
+    public void doBeforeStop() {
+        
DockerUtils.dumpContainerLogToTarget(dockerClientExecutor.getDockerClient(), 
containerID.getId());
+    }
+}
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/NoopAwaitStrategy.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/NoopAwaitStrategy.java
new file mode 100644
index 0000000..d9ef56c
--- /dev/null
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/NoopAwaitStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import org.arquillian.cube.spi.await.AwaitStrategy;
+
+public class NoopAwaitStrategy implements AwaitStrategy {
+    @Override
+    public boolean await() {
+        return true;
+    }
+}
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
new file mode 100644
index 0000000..c5b8e70
--- /dev/null
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.github.dockerjava.api.DockerClient;
+import com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.net.Socket;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarClusterUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarClusterUtils.class);
+    static final short BROKER_PORT = 8080;
+
+    public static String zookeeperConnectString(DockerClient docker, String 
cluster) {
+        return DockerUtils.cubeIdsWithLabels(docker, 
ImmutableMap.of("service", "zookeeper", "cluster", cluster))
+            .stream().map((id) -> DockerUtils.getContainerIP(docker, 
id)).collect(Collectors.joining(":"));
+    }
+
+    public static ZooKeeper zookeeperClient(DockerClient docker, String 
cluster) throws Exception {
+        String connectString = zookeeperConnectString(docker, cluster);
+        LOG.info("Connecting to zookeeper {}", connectString);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ZooKeeper zk = new ZooKeeper(connectString, 10000,
+                                     (e) -> {
+                                         if 
(e.getState().equals(KeeperState.SyncConnected)) {
+                                             future.complete(null);
+                                         }
+                                     });
+        future.get();
+        return zk;
+    }
+
+    public static boolean zookeeperRunning(DockerClient docker, String 
containerId) {
+        String ip = DockerUtils.getContainerIP(docker, containerId);
+        try (Socket socket = new Socket(ip, 2181)) {
+            socket.setSoTimeout(1000);
+            socket.getOutputStream().write("ruok".getBytes(UTF_8));
+            byte[] resp = new byte[4];
+            if (socket.getInputStream().read(resp) == 4) {
+                return new String(resp, UTF_8).equals("imok");
+            }
+        } catch (IOException e) {
+            // ignore, we'll return fallthrough to return false
+        }
+        return false;
+    }
+
+    public static boolean runOnAnyBroker(DockerClient docker, String cluster, 
String... cmds) throws Exception {
+        Optional<String> broker = DockerUtils.cubeIdsWithLabels(
+                docker,ImmutableMap.of("service", "pulsar-broker", "cluster", 
cluster)).stream().findAny();
+        if (broker.isPresent()) {
+            DockerUtils.runCommand(docker, broker.get(), cmds);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public static void runOnAllBrokers(DockerClient docker, String cluster, 
String... cmds) throws Exception {
+        DockerUtils.cubeIdsWithLabels(docker,ImmutableMap.of("service", 
"pulsar-broker", "cluster", cluster))
+            .stream().forEach((b) -> DockerUtils.runCommand(docker, b, cmds));
+    }
+
+    private static boolean waitBrokerState(DockerClient docker, String 
containerId,
+                                           int timeout, TimeUnit timeoutUnit,
+                                           boolean upOrDown) {
+        long timeoutMillis = timeoutUnit.toMillis(timeout);
+        long pollMillis = 1000;
+        String brokerId = DockerUtils.getContainerHostname(docker, 
containerId) + ":" + BROKER_PORT;
+        Optional<String> containerCluster = 
DockerUtils.getContainerCluster(docker, containerId);
+        if (!containerCluster.isPresent()) {
+            LOG.error("Unable to determine cluster for container {}. Missing 
label?", containerId);
+            return false;
+        }
+
+        ZooKeeper zk = null;
+        try {
+            zk = zookeeperClient(docker, containerCluster.get());
+            String path = "/loadbalance/brokers/" + brokerId;
+            while (timeoutMillis > 0) {
+                if ((zk.exists(path, false) != null) == upOrDown) {
+                    return true;
+                }
+                Thread.sleep(pollMillis);
+                timeoutMillis -= pollMillis;
+            }
+        } catch (Exception e) {
+            LOG.error("Exception checking for broker state", e);
+            return false;
+        } finally {
+            try {
+                if (zk != null) {
+                    zk.close();
+                }
+            } catch (Exception e) {
+                LOG.error("Exception closing zookeeper client", e);
+                return false;
+            }
+        }
+        LOG.warn("Broker {} didn't go {} after {} seconds",
+                 containerId, upOrDown ? "up" : "down",
+                 timeoutUnit.toSeconds(timeout));
+        return false;
+    }
+
+    public static boolean waitBrokerUp(DockerClient docker, String containerId,
+                                       int timeout, TimeUnit timeoutUnit) {
+        if (waitBrokerState(docker, containerId, timeout, timeoutUnit, true)) {
+            String ip = DockerUtils.getContainerIP(docker, containerId);
+
+            long timeoutMillis = timeoutUnit.toMillis(timeout);
+            long pollMillis = 100;
+
+            while (timeoutMillis > 0) {
+                try (Socket socket = new Socket(ip, BROKER_PORT)) {
+                    return true;
+                } catch (Exception e) {
+                    // couldn't connect, try again after sleep
+                }
+                try {
+                    Thread.sleep(pollMillis);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+                timeoutMillis -= pollMillis;
+            }
+        }
+        return false;
+    }
+
+    public static boolean waitBrokerDown(DockerClient docker, String 
containerId,
+                                         int timeout, TimeUnit timeoutUnit) {
+        return waitBrokerState(docker, containerId, timeout, timeoutUnit, 
false);
+    }
+
+    public static boolean waitAllBrokersUp(DockerClient docker, String 
cluster) {
+        return brokerSet(docker, cluster).stream()
+            .map((b) -> waitBrokerUp(docker, b, 10, TimeUnit.SECONDS))
+            .reduce(true, (accum, res) -> accum && res);
+    }
+
+    public static boolean waitAllBrokersDown(DockerClient docker, String 
cluster) {
+        return brokerSet(docker, cluster).stream()
+            .map((b) -> waitBrokerDown(docker, b, 10, TimeUnit.SECONDS))
+            .reduce(true, (accum, res) -> accum && res);
+    }
+
+    public static boolean waitSupervisord(DockerClient docker, String 
containerId) {
+        int i = 50;
+        while (i > 0) {
+            try {
+                DockerUtils.runCommand(docker, containerId, "test", "-S", 
"/var/run/supervisor/supervisor.sock");
+                return true;
+            } catch (Exception e) {
+                // supervisord not running
+            }
+            try {
+                Thread.sleep(100);
+                i++;
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                break;
+            }
+        }
+        return false;
+    }
+
+    public static boolean startAllBrokers(DockerClient docker, String cluster) 
{
+        brokerSet(docker, cluster).stream().forEach(
+                (b) -> {
+                    waitSupervisord(docker, b);
+                    DockerUtils.runCommand(docker, b, "supervisorctl", 
"start", "broker");
+                });
+
+        return waitAllBrokersUp(docker, cluster);
+    }
+
+    public static boolean stopAllBrokers(DockerClient docker, String cluster) {
+        brokerSet(docker, cluster).stream().forEach(
+                (b) -> DockerUtils.runCommand(docker, b, "supervisorctl", 
"stop", "broker"));
+
+        return waitAllBrokersDown(docker, cluster);
+    }
+
+    public static Set<String> brokerSet(DockerClient docker, String cluster) {
+        return DockerUtils.cubeIdsWithLabels(docker, 
ImmutableMap.of("service", "pulsar-broker",
+                                                                     
"cluster", cluster));
+    }
+
+    public static boolean waitProxyUp(DockerClient docker, String containerId,
+                                      int timeout, TimeUnit timeoutUnit) {
+        String ip = DockerUtils.getContainerIP(docker, containerId);
+        long timeoutMillis = timeoutUnit.toMillis(timeout);
+        long pollMillis = 100;
+
+        while (timeoutMillis > 0) {
+            try (Socket socket = new Socket(ip, BROKER_PORT)) {
+                return true;
+            } catch (Exception e) {
+                // couldn't connect, try again after sleep
+            }
+            try {
+                Thread.sleep(pollMillis);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                break;
+            }
+            timeoutMillis -= pollMillis;
+        }
+        return false;
+    }
+
+    public static boolean waitAllProxiesUp(DockerClient docker, String 
cluster) {
+        return proxySet(docker, cluster).stream()
+            .map((b) -> waitProxyUp(docker, b, 10, TimeUnit.SECONDS))
+            .reduce(true, (accum, res) -> accum && res);
+    }
+
+    public static boolean startAllProxies(DockerClient docker, String cluster) 
{
+        proxySet(docker, cluster).stream().forEach(
+                (b) -> {
+                    waitSupervisord(docker, b);
+                    DockerUtils.runCommand(docker, b, "supervisorctl", 
"start", "proxy");
+                });
+
+        return waitAllProxiesUp(docker, cluster);
+    }
+
+    public static void stopAllProxies(DockerClient docker, String cluster) {
+        proxySet(docker, cluster).stream().forEach(
+                (b) -> DockerUtils.runCommand(docker, b, "supervisorctl", 
"stop", "proxy"));
+    }
+
+    public static Set<String> proxySet(DockerClient docker, String cluster) {
+        return DockerUtils.cubeIdsWithLabels(docker, 
ImmutableMap.of("service", "pulsar-proxy",
+                                                                     
"cluster", cluster));
+    }
+}
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarLogsToTargetDirStopAction.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarLogsToTargetDirStopAction.java
new file mode 100644
index 0000000..0023359
--- /dev/null
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarLogsToTargetDirStopAction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import org.arquillian.cube.docker.impl.docker.DockerClientExecutor;
+import org.arquillian.cube.impl.model.CubeId;
+import org.arquillian.cube.spi.beforeStop.BeforeStopAction;
+
+public class PulsarLogsToTargetDirStopAction implements BeforeStopAction {
+    private DockerClientExecutor dockerClientExecutor;
+    private CubeId containerID;
+
+    public void setDockerClientExecutor(DockerClientExecutor executor) {
+        this.dockerClientExecutor = executor;
+    }
+
+    public void setContainerID(CubeId containerID) {
+        this.containerID = containerID;
+    }
+
+    @Override
+    public void doBeforeStop() {
+        
DockerUtils.dumpContainerLogToTarget(dockerClientExecutor.getDockerClient(), 
containerID.getId());
+        
DockerUtils.dumpContainerLogDirToTarget(dockerClientExecutor.getDockerClient(),
+                                                containerID.getId(), 
"/var/log/pulsar");
+    }
+}
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/ZooKeeperAwaitStrategy.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/ZooKeeperAwaitStrategy.java
new file mode 100644
index 0000000..7cad4cb
--- /dev/null
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/ZooKeeperAwaitStrategy.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import java.util.concurrent.TimeUnit;
+
+import org.arquillian.cube.spi.Cube;
+import org.arquillian.cube.spi.await.AwaitStrategy;
+import org.arquillian.cube.docker.impl.client.config.Await;
+import org.arquillian.cube.docker.impl.docker.DockerClientExecutor;
+import org.arquillian.cube.docker.impl.util.Ping;
+import org.arquillian.cube.docker.impl.util.PingCommand;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperAwaitStrategy implements AwaitStrategy {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperAwaitStrategy.class);
+
+    private static final int DEFAULT_POLL_ITERATIONS = 10;
+    private static final int DEFAULT_SLEEP_TIME = 1;
+    private static final TimeUnit DEFAULT_SLEEP_TIMEUNIT = TimeUnit.SECONDS;
+
+    private Cube<?> cube;
+    private DockerClientExecutor dockerClientExecutor;
+
+    @Override
+    public boolean await() {
+        return Ping.ping(DEFAULT_POLL_ITERATIONS, DEFAULT_SLEEP_TIME, 
DEFAULT_SLEEP_TIMEUNIT,
+                new PingCommand() {
+                    @Override
+                    public boolean call() {
+                        return 
PulsarClusterUtils.zookeeperRunning(dockerClientExecutor.getDockerClient(),
+                                                                   
cube.getId());
+                    }
+                });
+    }
+}
diff --git a/tests/integration-tests-utils/src/main/resources/log4j2.yml 
b/tests/integration-tests-utils/src/main/resources/log4j2.yml
new file mode 100644
index 0000000..94ad627
--- /dev/null
+++ b/tests/integration-tests-utils/src/main/resources/log4j2.yml
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+Configuration:
+  name: test
+
+  Appenders:
+
+    # Console
+    Console:
+      name: Console
+      target: SYSTEM_OUT
+      PatternLayout:
+        Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
+
+  Loggers:
+
+    Root:
+      level: warn
+      AppenderRef:
+        - ref: Console
+
+    Logger:
+      name: org.apache.pulsar
+      level: info
diff --git a/tests/pom.xml b/tests/pom.xml
index ea6f20b..5c8429d 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -33,6 +33,7 @@
   <name>Apache Pulsar :: Tests</name>
   <modules>
     <module>docker-images</module>
+    <module>integration-tests-utils</module>
   </modules>
   <build>
     <plugins>

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to