merlimat closed pull request #1310: Utilities for arquillian integration tests
URL: https://github.com/apache/incubator-pulsar/pull/1310
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/integration-tests-utils/pom.xml 
b/tests/integration-tests-utils/pom.xml
new file mode 100644
index 000000000..c6d74c853
--- /dev/null
+++ b/tests/integration-tests-utils/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="
+  http://maven.apache.org/POM/4.0.0
+  http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar.tests</groupId>
+    <artifactId>tests-parent</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.pulsar.tests</groupId>
+  <artifactId>integration-tests-utils</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Pulsar :: Tests :: Utility module for Arquillian based 
integration tests</name>
+
+  <properties>
+    <arquillian-cube.version>1.15.1</arquillian-cube.version>
+    <commons-compress.version>1.15</commons-compress.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+      <version>${commons-compress.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.arquillian.cube</groupId>
+      <artifactId>arquillian-cube-docker</artifactId>
+      <version>${arquillian-cube.version}</version>
+    </dependency>
+
+  </dependencies>
+</project>
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java
new file mode 100644
index 000000000..fb69dc7a6
--- /dev/null
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.async.ResultCallback;
+import com.github.dockerjava.api.command.InspectExecResponse;
+import com.github.dockerjava.api.model.Frame;
+import com.github.dockerjava.api.model.ContainerNetwork;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DockerUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DockerUtils.class);
+
+    private static File getTargetDirectory(String containerId) {
+        String base = System.getProperty("maven.buildDirectory");
+        if (base == null) {
+            base = "target";
+        }
+        File directory = new File(base + "/container-logs/" + containerId);
+        if (!directory.exists() && !directory.mkdirs()) {
+            LOG.error("Error creating directory for container logs.");
+        }
+        return directory;
+    }
+
+    public static void dumpContainerLogToTarget(DockerClient docker, String 
containerId) {
+        File output = new File(getTargetDirectory(containerId), "docker.log");
+        try (FileOutputStream os = new FileOutputStream(output)) {
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            docker.logContainerCmd(containerId).withStdOut(true)
+                .withStdErr(true).withTimestamps(true).exec(new 
ResultCallback<Frame>() {
+                        @Override
+                        public void close() {}
+
+                        @Override
+                        public void onStart(Closeable closeable) {}
+
+                        @Override
+                        public void onNext(Frame object) {
+                            try {
+                                os.write(object.getPayload());
+                            } catch (IOException e) {
+                                onError(e);
+                            }
+                        }
+
+                        @Override
+                        public void onError(Throwable throwable) {
+                            future.completeExceptionally(throwable);
+                        }
+
+                        @Override
+                        public void onComplete() {
+                            future.complete(true);
+                        }
+                    });
+            future.get();
+        } catch (RuntimeException|ExecutionException|IOException e) {
+            LOG.error("Error dumping log for {}", containerId, e);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            LOG.info("Interrupted dumping log from container {}", containerId, 
ie);
+        }
+    }
+
+    public static void dumpContainerLogDirToTarget(DockerClient docker, String 
containerId, String path) {
+        final int READ_BLOCK_SIZE = 10000;
+
+        try (InputStream dockerStream = 
docker.copyArchiveFromContainerCmd(containerId, path).exec();
+             TarArchiveInputStream stream = new 
TarArchiveInputStream(dockerStream)) {
+            TarArchiveEntry entry = stream.getNextTarEntry();
+            while (entry != null) {
+                if (entry.isFile()) {
+                    File output = new File(getTargetDirectory(containerId), 
entry.getName().replace("/", "-"));
+                    try (FileOutputStream os = new FileOutputStream(output)) {
+                        byte[] block = new byte[READ_BLOCK_SIZE];
+                        int read = stream.read(block, 0, READ_BLOCK_SIZE);
+                        while (read > -1) {
+                            os.write(block, 0, read);
+                            read = stream.read(block, 0, READ_BLOCK_SIZE);
+                        }
+                    }
+                }
+                entry = stream.getNextTarEntry();
+            }
+        } catch (RuntimeException|IOException e) {
+            LOG.error("Error reading logs from container {}", containerId, e);
+        }
+    }
+
+    public static String getContainerIP(DockerClient docker, String 
containerId) {
+        for (Map.Entry<String, ContainerNetwork> e : 
docker.inspectContainerCmd(containerId)
+                 .exec().getNetworkSettings().getNetworks().entrySet()) {
+            return e.getValue().getIpAddress();
+        }
+        throw new IllegalArgumentException("Container " + containerId + " has 
no networks");
+    }
+
+    public static String getContainerHostname(DockerClient docker, String 
containerId) {
+        return runCommand(docker, containerId, "hostname").trim();
+    }
+
+    public static String runCommand(DockerClient docker, String containerId, 
String... cmd) {
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        String execid = docker.execCreateCmd(containerId).withCmd(cmd)
+            .withAttachStderr(true).withAttachStdout(true).exec().getId();
+        String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" "));
+        StringBuffer output = new StringBuffer();
+        docker.execStartCmd(execid).withDetach(false)
+            .exec(new ResultCallback<Frame>() {
+                @Override
+                public void close() {}
+
+                @Override
+                public void onStart(Closeable closeable) {
+                    LOG.info("DOCKER.exec({}:{}): Executing...", containerId, 
cmdString);
+                }
+
+                @Override
+                public void onNext(Frame object) {
+                    LOG.info("DOCKER.exec({}:{}): {}", containerId, cmdString, 
object);
+                    output.append(new String(object.getPayload()));
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    future.completeExceptionally(throwable);
+                }
+
+                @Override
+                public void onComplete() {
+                    LOG.info("DOCKER.exec({}:{}): Done", containerId, 
cmdString);
+                    future.complete(true);
+                }
+            });
+        future.join();
+
+        InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
+        while (resp.isRunning()) {
+            try {
+                Thread.sleep(200);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(ie);
+            }
+            resp = docker.inspectExecCmd(execid).exec();
+        }
+        int retCode = resp.getExitCode();
+        if (retCode != 0) {
+            throw new RuntimeException(
+                    String.format("cmd(%s) failed on %s with exitcode %d",
+                                  cmdString, containerId, retCode));
+        } else {
+            LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, 
cmdString, retCode);
+        }
+        return output.toString();
+    }
+
+    public static Optional<String> getContainerCluster(DockerClient docker, 
String containerId) {
+        return Optional.ofNullable(docker.inspectContainerCmd(containerId)
+                                   
.exec().getConfig().getLabels().get("cluster"));
+    }
+
+    public static Set<String> allCubeIds() {
+        Pattern pattern = Pattern.compile("^arq.cube.docker.([^.]*).ip$");
+        return System.getProperties().keySet().stream()
+            .map(k -> pattern.matcher(k.toString()))
+            .filter(m -> m.matches())
+            .map(m -> m.group(1))
+            .collect(Collectors.toSet());
+    }
+
+    public static Set<String> cubeIdsWithLabels(DockerClient docker, 
Map<String,String> labels) {
+        return allCubeIds().stream()
+            .filter(id -> {
+                    Map<String,String> configuredLabels = 
docker.inspectContainerCmd(id).exec().getConfig().getLabels();
+                    return labels.entrySet().stream()
+                        .map(e -> configuredLabels.containsKey(e.getKey())
+                             && 
configuredLabels.get(e.getKey()).equals(e.getValue()))
+                        .reduce(true, (acc, res) -> acc && res);
+                })
+            .collect(Collectors.toSet());
+    }
+}
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/LogToTargetDirStopAction.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/LogToTargetDirStopAction.java
new file mode 100644
index 000000000..f74ce471f
--- /dev/null
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/LogToTargetDirStopAction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import org.arquillian.cube.docker.impl.docker.DockerClientExecutor;
+import org.arquillian.cube.impl.model.CubeId;
+import org.arquillian.cube.spi.beforeStop.BeforeStopAction;
+
+public class LogToTargetDirStopAction implements BeforeStopAction {
+    private DockerClientExecutor dockerClientExecutor;
+    private CubeId containerID;
+
+    public void setDockerClientExecutor(DockerClientExecutor executor) {
+        this.dockerClientExecutor = executor;
+    }
+
+    public void setContainerID(CubeId containerID) {
+        this.containerID = containerID;
+    }
+
+    @Override
+    public void doBeforeStop() {
+        
DockerUtils.dumpContainerLogToTarget(dockerClientExecutor.getDockerClient(), 
containerID.getId());
+    }
+}
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/NoopAwaitStrategy.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/NoopAwaitStrategy.java
new file mode 100644
index 000000000..d9ef56c8c
--- /dev/null
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/NoopAwaitStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import org.arquillian.cube.spi.await.AwaitStrategy;
+
+public class NoopAwaitStrategy implements AwaitStrategy {
+    @Override
+    public boolean await() {
+        return true;
+    }
+}
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
new file mode 100644
index 000000000..c5b8e7019
--- /dev/null
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.github.dockerjava.api.DockerClient;
+import com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.net.Socket;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarClusterUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarClusterUtils.class);
+    static final short BROKER_PORT = 8080;
+
+    public static String zookeeperConnectString(DockerClient docker, String 
cluster) {
+        return DockerUtils.cubeIdsWithLabels(docker, 
ImmutableMap.of("service", "zookeeper", "cluster", cluster))
+            .stream().map((id) -> DockerUtils.getContainerIP(docker, 
id)).collect(Collectors.joining(":"));
+    }
+
+    public static ZooKeeper zookeeperClient(DockerClient docker, String 
cluster) throws Exception {
+        String connectString = zookeeperConnectString(docker, cluster);
+        LOG.info("Connecting to zookeeper {}", connectString);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ZooKeeper zk = new ZooKeeper(connectString, 10000,
+                                     (e) -> {
+                                         if 
(e.getState().equals(KeeperState.SyncConnected)) {
+                                             future.complete(null);
+                                         }
+                                     });
+        future.get();
+        return zk;
+    }
+
+    public static boolean zookeeperRunning(DockerClient docker, String 
containerId) {
+        String ip = DockerUtils.getContainerIP(docker, containerId);
+        try (Socket socket = new Socket(ip, 2181)) {
+            socket.setSoTimeout(1000);
+            socket.getOutputStream().write("ruok".getBytes(UTF_8));
+            byte[] resp = new byte[4];
+            if (socket.getInputStream().read(resp) == 4) {
+                return new String(resp, UTF_8).equals("imok");
+            }
+        } catch (IOException e) {
+            // ignore, we'll return fallthrough to return false
+        }
+        return false;
+    }
+
+    public static boolean runOnAnyBroker(DockerClient docker, String cluster, 
String... cmds) throws Exception {
+        Optional<String> broker = DockerUtils.cubeIdsWithLabels(
+                docker,ImmutableMap.of("service", "pulsar-broker", "cluster", 
cluster)).stream().findAny();
+        if (broker.isPresent()) {
+            DockerUtils.runCommand(docker, broker.get(), cmds);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public static void runOnAllBrokers(DockerClient docker, String cluster, 
String... cmds) throws Exception {
+        DockerUtils.cubeIdsWithLabels(docker,ImmutableMap.of("service", 
"pulsar-broker", "cluster", cluster))
+            .stream().forEach((b) -> DockerUtils.runCommand(docker, b, cmds));
+    }
+
+    private static boolean waitBrokerState(DockerClient docker, String 
containerId,
+                                           int timeout, TimeUnit timeoutUnit,
+                                           boolean upOrDown) {
+        long timeoutMillis = timeoutUnit.toMillis(timeout);
+        long pollMillis = 1000;
+        String brokerId = DockerUtils.getContainerHostname(docker, 
containerId) + ":" + BROKER_PORT;
+        Optional<String> containerCluster = 
DockerUtils.getContainerCluster(docker, containerId);
+        if (!containerCluster.isPresent()) {
+            LOG.error("Unable to determine cluster for container {}. Missing 
label?", containerId);
+            return false;
+        }
+
+        ZooKeeper zk = null;
+        try {
+            zk = zookeeperClient(docker, containerCluster.get());
+            String path = "/loadbalance/brokers/" + brokerId;
+            while (timeoutMillis > 0) {
+                if ((zk.exists(path, false) != null) == upOrDown) {
+                    return true;
+                }
+                Thread.sleep(pollMillis);
+                timeoutMillis -= pollMillis;
+            }
+        } catch (Exception e) {
+            LOG.error("Exception checking for broker state", e);
+            return false;
+        } finally {
+            try {
+                if (zk != null) {
+                    zk.close();
+                }
+            } catch (Exception e) {
+                LOG.error("Exception closing zookeeper client", e);
+                return false;
+            }
+        }
+        LOG.warn("Broker {} didn't go {} after {} seconds",
+                 containerId, upOrDown ? "up" : "down",
+                 timeoutUnit.toSeconds(timeout));
+        return false;
+    }
+
+    public static boolean waitBrokerUp(DockerClient docker, String containerId,
+                                       int timeout, TimeUnit timeoutUnit) {
+        if (waitBrokerState(docker, containerId, timeout, timeoutUnit, true)) {
+            String ip = DockerUtils.getContainerIP(docker, containerId);
+
+            long timeoutMillis = timeoutUnit.toMillis(timeout);
+            long pollMillis = 100;
+
+            while (timeoutMillis > 0) {
+                try (Socket socket = new Socket(ip, BROKER_PORT)) {
+                    return true;
+                } catch (Exception e) {
+                    // couldn't connect, try again after sleep
+                }
+                try {
+                    Thread.sleep(pollMillis);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+                timeoutMillis -= pollMillis;
+            }
+        }
+        return false;
+    }
+
+    public static boolean waitBrokerDown(DockerClient docker, String 
containerId,
+                                         int timeout, TimeUnit timeoutUnit) {
+        return waitBrokerState(docker, containerId, timeout, timeoutUnit, 
false);
+    }
+
+    public static boolean waitAllBrokersUp(DockerClient docker, String 
cluster) {
+        return brokerSet(docker, cluster).stream()
+            .map((b) -> waitBrokerUp(docker, b, 10, TimeUnit.SECONDS))
+            .reduce(true, (accum, res) -> accum && res);
+    }
+
+    public static boolean waitAllBrokersDown(DockerClient docker, String 
cluster) {
+        return brokerSet(docker, cluster).stream()
+            .map((b) -> waitBrokerDown(docker, b, 10, TimeUnit.SECONDS))
+            .reduce(true, (accum, res) -> accum && res);
+    }
+
+    public static boolean waitSupervisord(DockerClient docker, String 
containerId) {
+        int i = 50;
+        while (i > 0) {
+            try {
+                DockerUtils.runCommand(docker, containerId, "test", "-S", 
"/var/run/supervisor/supervisor.sock");
+                return true;
+            } catch (Exception e) {
+                // supervisord not running
+            }
+            try {
+                Thread.sleep(100);
+                i++;
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                break;
+            }
+        }
+        return false;
+    }
+
+    public static boolean startAllBrokers(DockerClient docker, String cluster) 
{
+        brokerSet(docker, cluster).stream().forEach(
+                (b) -> {
+                    waitSupervisord(docker, b);
+                    DockerUtils.runCommand(docker, b, "supervisorctl", 
"start", "broker");
+                });
+
+        return waitAllBrokersUp(docker, cluster);
+    }
+
+    public static boolean stopAllBrokers(DockerClient docker, String cluster) {
+        brokerSet(docker, cluster).stream().forEach(
+                (b) -> DockerUtils.runCommand(docker, b, "supervisorctl", 
"stop", "broker"));
+
+        return waitAllBrokersDown(docker, cluster);
+    }
+
+    public static Set<String> brokerSet(DockerClient docker, String cluster) {
+        return DockerUtils.cubeIdsWithLabels(docker, 
ImmutableMap.of("service", "pulsar-broker",
+                                                                     
"cluster", cluster));
+    }
+
+    public static boolean waitProxyUp(DockerClient docker, String containerId,
+                                      int timeout, TimeUnit timeoutUnit) {
+        String ip = DockerUtils.getContainerIP(docker, containerId);
+        long timeoutMillis = timeoutUnit.toMillis(timeout);
+        long pollMillis = 100;
+
+        while (timeoutMillis > 0) {
+            try (Socket socket = new Socket(ip, BROKER_PORT)) {
+                return true;
+            } catch (Exception e) {
+                // couldn't connect, try again after sleep
+            }
+            try {
+                Thread.sleep(pollMillis);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                break;
+            }
+            timeoutMillis -= pollMillis;
+        }
+        return false;
+    }
+
+    public static boolean waitAllProxiesUp(DockerClient docker, String 
cluster) {
+        return proxySet(docker, cluster).stream()
+            .map((b) -> waitProxyUp(docker, b, 10, TimeUnit.SECONDS))
+            .reduce(true, (accum, res) -> accum && res);
+    }
+
+    public static boolean startAllProxies(DockerClient docker, String cluster) 
{
+        proxySet(docker, cluster).stream().forEach(
+                (b) -> {
+                    waitSupervisord(docker, b);
+                    DockerUtils.runCommand(docker, b, "supervisorctl", 
"start", "proxy");
+                });
+
+        return waitAllProxiesUp(docker, cluster);
+    }
+
+    public static void stopAllProxies(DockerClient docker, String cluster) {
+        proxySet(docker, cluster).stream().forEach(
+                (b) -> DockerUtils.runCommand(docker, b, "supervisorctl", 
"stop", "proxy"));
+    }
+
+    public static Set<String> proxySet(DockerClient docker, String cluster) {
+        return DockerUtils.cubeIdsWithLabels(docker, 
ImmutableMap.of("service", "pulsar-proxy",
+                                                                     
"cluster", cluster));
+    }
+}
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarLogsToTargetDirStopAction.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarLogsToTargetDirStopAction.java
new file mode 100644
index 000000000..00233595d
--- /dev/null
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarLogsToTargetDirStopAction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import org.arquillian.cube.docker.impl.docker.DockerClientExecutor;
+import org.arquillian.cube.impl.model.CubeId;
+import org.arquillian.cube.spi.beforeStop.BeforeStopAction;
+
+public class PulsarLogsToTargetDirStopAction implements BeforeStopAction {
+    private DockerClientExecutor dockerClientExecutor;
+    private CubeId containerID;
+
+    public void setDockerClientExecutor(DockerClientExecutor executor) {
+        this.dockerClientExecutor = executor;
+    }
+
+    public void setContainerID(CubeId containerID) {
+        this.containerID = containerID;
+    }
+
+    @Override
+    public void doBeforeStop() {
+        
DockerUtils.dumpContainerLogToTarget(dockerClientExecutor.getDockerClient(), 
containerID.getId());
+        
DockerUtils.dumpContainerLogDirToTarget(dockerClientExecutor.getDockerClient(),
+                                                containerID.getId(), 
"/var/log/pulsar");
+    }
+}
diff --git 
a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/ZooKeeperAwaitStrategy.java
 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/ZooKeeperAwaitStrategy.java
new file mode 100644
index 000000000..7cad4cb76
--- /dev/null
+++ 
b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/ZooKeeperAwaitStrategy.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import java.util.concurrent.TimeUnit;
+
+import org.arquillian.cube.spi.Cube;
+import org.arquillian.cube.spi.await.AwaitStrategy;
+import org.arquillian.cube.docker.impl.client.config.Await;
+import org.arquillian.cube.docker.impl.docker.DockerClientExecutor;
+import org.arquillian.cube.docker.impl.util.Ping;
+import org.arquillian.cube.docker.impl.util.PingCommand;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperAwaitStrategy implements AwaitStrategy {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperAwaitStrategy.class);
+
+    private static final int DEFAULT_POLL_ITERATIONS = 10;
+    private static final int DEFAULT_SLEEP_TIME = 1;
+    private static final TimeUnit DEFAULT_SLEEP_TIMEUNIT = TimeUnit.SECONDS;
+
+    private Cube<?> cube;
+    private DockerClientExecutor dockerClientExecutor;
+
+    @Override
+    public boolean await() {
+        return Ping.ping(DEFAULT_POLL_ITERATIONS, DEFAULT_SLEEP_TIME, 
DEFAULT_SLEEP_TIMEUNIT,
+                new PingCommand() {
+                    @Override
+                    public boolean call() {
+                        return 
PulsarClusterUtils.zookeeperRunning(dockerClientExecutor.getDockerClient(),
+                                                                   
cube.getId());
+                    }
+                });
+    }
+}
diff --git a/tests/integration-tests-utils/src/main/resources/log4j2.yml 
b/tests/integration-tests-utils/src/main/resources/log4j2.yml
new file mode 100644
index 000000000..94ad627c6
--- /dev/null
+++ b/tests/integration-tests-utils/src/main/resources/log4j2.yml
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+Configuration:
+  name: test
+
+  Appenders:
+
+    # Console
+    Console:
+      name: Console
+      target: SYSTEM_OUT
+      PatternLayout:
+        Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
+
+  Loggers:
+
+    Root:
+      level: warn
+      AppenderRef:
+        - ref: Console
+
+    Logger:
+      name: org.apache.pulsar
+      level: info
diff --git a/tests/pom.xml b/tests/pom.xml
index ea6f20b59..5c8429d66 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -33,6 +33,7 @@
   <name>Apache Pulsar :: Tests</name>
   <modules>
     <module>docker-images</module>
+    <module>integration-tests-utils</module>
   </modules>
   <build>
     <plugins>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to