This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b59248  Migrate compaction, s3offload to test containers (#2101)
7b59248 is described below

commit 7b59248ad87de2c9b96f56c542ff87a87a44e0f2
Author: Ali Ahmed <[email protected]>
AuthorDate: Tue Jul 17 02:42:25 2018 -0700

    Migrate compaction, s3offload to test containers (#2101)
    
    ###Motivation
    
    This is part of the migration effort from arquillian framework to 
testcontainers.
---
 .../pulsar/tests/containers/S3Container.java       |  54 +++++
 .../pulsar/tests/topologies/PulsarCluster.java     | 122 +++++++----
 .../pulsar/tests/topologies/PulsarClusterSpec.java |   5 +-
 .../tests/topologies/PulsarClusterTestBase.java    |  61 +++---
 tests/integration/compaction/pom.xml               |  17 +-
 .../pulsar/tests/integration/TestCompaction.java   | 140 ++++++-------
 .../compaction/src/test/resources/arquillian.xml   |  32 ---
 tests/integration/s3-offload/pom.xml               |  11 +-
 .../pulsar/tests/integration/TestS3Offload.java    | 228 ++++++++++-----------
 .../s3-offload/src/test/resources/arquillian.xml   |  32 ---
 10 files changed, 364 insertions(+), 338 deletions(-)

diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/S3Container.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/S3Container.java
new file mode 100644
index 0000000..9efee21
--- /dev/null
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/S3Container.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * S3 simulation container
+ */
+@Slf4j
+public class S3Container extends ChaosContainer<S3Container> {
+
+    public static final String NAME = "s3";
+    private static final String IMAGE_NAME = "apachepulsar/s3mock:latest";
+    private final String hostname;
+
+    public S3Container(String clusterName, String hostname) {
+        super(clusterName, IMAGE_NAME);
+        this.hostname = hostname;
+        this.withEnv("initialBuckets", "pulsar-integtest");
+    }
+
+    @Override
+    public String getContainerName() {
+        return clusterName + "-" + hostname;
+    }
+
+    @Override
+    public void start() {
+        this.withCreateContainerCmdModifier(createContainerCmd -> {
+            createContainerCmd.withHostName(hostname);
+            createContainerCmd.withName(getContainerName());
+        });
+
+        super.start();
+        log.info("Start s3 service");
+    }
+}
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 040d59d..0f0bcff 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.tests.topologies;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT;
+import static org.apache.pulsar.tests.containers.PulsarContainer.ZK_PORT;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -32,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Stream;
 
+import com.google.common.collect.Streams;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.containers.BKContainer;
@@ -53,9 +55,10 @@ public class PulsarCluster {
 
     public static final String ADMIN_SCRIPT = "/pulsar/bin/pulsar-admin";
     public static final String CLIENT_SCRIPT = "/pulsar/bin/pulsar-client";
+    public static final String PULSAR_COMMAND_SCRIPT = "/pulsar/bin/pulsar";
 
     /**
-     * Pulsar Cluster Spec.
+     * Pulsar Cluster Spec
      *
      * @param spec pulsar cluster spec.
      * @return the built pulsar cluster
@@ -106,6 +109,31 @@ public class PulsarCluster {
             .withEnv("zookeeperServers", ZKContainer.NAME)
             .withEnv("configurationStoreServers", CSContainer.NAME + ":" + 
CS_PORT)
             .withEnv("clusterName", clusterName);
+
+        // create bookies
+        bookieContainers.putAll(
+                runNumContainers("bookie", spec.numBookies(), (name) -> new 
BKContainer(clusterName, name)
+                        .withNetwork(network)
+                        .withNetworkAliases(name)
+                        .withEnv("zkServers", ZKContainer.NAME)
+                        .withEnv("useHostNameAsBookieID", "true")
+                        .withEnv("clusterName", clusterName)
+                )
+        );
+
+        // create brokers
+        brokerContainers.putAll(
+                runNumContainers("broker", spec.numBrokers(), (name) -> new 
BrokerContainer(clusterName, name)
+                        .withNetwork(network)
+                        .withNetworkAliases(name)
+                        .withEnv("zkServers", ZKContainer.NAME)
+                        .withEnv("zookeeperServers", ZKContainer.NAME)
+                        .withEnv("configurationStoreServers", CSContainer.NAME 
+ ":" + CS_PORT)
+                        .withEnv("clusterName", clusterName)
+                        
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
+                )
+        );
+
     }
 
     public String getPlainTextServiceUrl() {
@@ -116,6 +144,10 @@ public class PulsarCluster {
         return proxyContainer.getHttpServiceUrl();
     }
 
+    public String getZKConnString() {
+        return zkContainer.getContainerIpAddress() + ":" + 
zkContainer.getMappedPort(ZK_PORT);
+    }
+
     public void start() throws Exception {
         // start the local zookeeper
         zkContainer.start();
@@ -130,29 +162,13 @@ public class PulsarCluster {
             "bin/init-cluster.sh");
         log.info("Successfully initialized the cluster.");
 
-        // create bookies
-        bookieContainers.putAll(
-            runNumContainers("bookie", spec.numBookies(), (name) -> new 
BKContainer(clusterName, name)
-                .withNetwork(network)
-                .withNetworkAliases(name)
-                .withEnv("zkServers", ZKContainer.NAME)
-                .withEnv("useHostNameAsBookieID", "true")
-                .withEnv("clusterName", clusterName)
-            )
-        );
+        // start bookies
+        bookieContainers.values().forEach(BKContainer::start);
+        log.info("Successfully started {} bookie conntainers.", 
bookieContainers.size());
 
-        // create brokers
-        brokerContainers.putAll(
-            runNumContainers("broker", spec.numBrokers(), (name) -> new 
BrokerContainer(clusterName, name)
-                .withNetwork(network)
-                .withNetworkAliases(name)
-                .withEnv("zkServers", ZKContainer.NAME)
-                .withEnv("zookeeperServers", ZKContainer.NAME)
-                .withEnv("configurationStoreServers", CSContainer.NAME + ":" + 
CS_PORT)
-                .withEnv("clusterName", clusterName)
-                .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", 
"1")
-            )
-        );
+        // start brokers
+        this.startAllBrokers();
+        log.info("Successfully started {} broker conntainers.", 
brokerContainers.size());
 
         // create proxy
         proxyContainer.start();
@@ -175,12 +191,12 @@ public class PulsarCluster {
         }
 
         // start external services
-        Map<String, GenericContainer<?>> externalServices = 
spec.externalServices;
+        final Map<String, GenericContainer<?>> externalServices = 
spec.externalServices;
         if (null != externalServices) {
-            externalServices.entrySet().forEach(service -> {
-                GenericContainer<?> serviceContainer = service.getValue();
-                serviceContainer.withNetwork(network);
-                serviceContainer.start();
+            externalServices.entrySet().parallelStream().forEach(service -> {
+                service.getValue().withNetwork(network);
+                service.getValue().withNetworkAliases(service.getKey());
+                service.getValue().start();
                 log.info("Successfully start external service {}.", 
service.getKey());
             });
         }
@@ -195,24 +211,25 @@ public class PulsarCluster {
             String name = "pulsar-" + serviceName + "-" + i;
             T container = containerCreator.apply(name);
             containers.put(name, container);
-            startFutures.add(CompletableFuture.runAsync(() -> 
container.start()));
         }
-        CompletableFuture.allOf(startFutures.toArray(new 
CompletableFuture[startFutures.size()])).join();
-        log.info("Successfully started {} {} containers", numContainers, 
serviceName);
         return containers;
     }
 
     public void stop() {
-        Stream.of(proxyContainer, csContainer, 
zkContainer).parallel().forEach(GenericContainer::stop);
-        
workerContainers.values().parallelStream().forEach(GenericContainer::stop);
-        
brokerContainers.values().parallelStream().forEach(GenericContainer::stop);
-        
bookieContainers.values().parallelStream().forEach(GenericContainer::stop);
-        if (null != spec.externalServices()) {
-            spec.externalServices().values()
-                .parallelStream()
-                .forEach(GenericContainer::stop);
+
+        Stream<GenericContainer> containers = Streams.concat(
+                workerContainers.values().stream(),
+                brokerContainers.values().stream(),
+                bookieContainers.values().stream(),
+                Stream.of(proxyContainer, csContainer, zkContainer)
+        );
+
+        if (spec.externalServices() != null) {
+            containers = Streams.concat(containers, 
spec.externalServices().values().stream());
         }
 
+        containers.parallel().forEach(GenericContainer::stop);
+
         try {
             network.close();
         } catch (Exception e) {
@@ -242,6 +259,7 @@ public class PulsarCluster {
                 // bookkeeper tools
                 .withEnv("zkServers", ZKContainer.NAME)
         ));
+        this.startWorkers();
     }
 
     private void startFunctionWorkersWithThreadContainerFactory(int 
numFunctionWorkers) {
@@ -267,6 +285,13 @@ public class PulsarCluster {
                 // bookkeeper tools
                 .withEnv("zkServers", ZKContainer.NAME)
         ));
+        this.startWorkers();
+    }
+
+    private void startWorkers() {
+        // Start workers that have been initialized
+        
workerContainers.values().parallelStream().forEach(WorkerContainer::start);
+        log.info("Successfully started {} worker conntainers.", 
workerContainers.size());
     }
 
     public BrokerContainer getAnyBroker() {
@@ -290,13 +315,29 @@ public class PulsarCluster {
     }
 
     public ExecResult runAdminCommandOnAnyBroker(String...commands) throws 
Exception {
+        return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
+    }
+
+    public ExecResult runPulsarBaseCommandOnAnyBroker(String...commands) 
throws Exception {
+        return runCommandOnAnyBrokerWithScript(PULSAR_COMMAND_SCRIPT, 
commands);
+    }
+
+    private ExecResult runCommandOnAnyBrokerWithScript(String scriptType, 
String...commands) throws Exception {
         BrokerContainer container = getAnyBroker();
         String[] cmds = new String[commands.length + 1];
-        cmds[0] = ADMIN_SCRIPT;
+        cmds[0] = scriptType;
         System.arraycopy(commands, 0, cmds, 1, commands.length);
         return container.execCmd(cmds);
     }
 
+    public void stopAllBrokers() {
+        brokerContainers.values().forEach(BrokerContainer::stop);
+    }
+
+    public void startAllBrokers() {
+        brokerContainers.values().forEach(BrokerContainer::start);
+    }
+
     public ExecResult createNamespace(String nsName) throws Exception {
         return runAdminCommandOnAnyBroker(
             "namespaces", "create", "public/" + nsName,
@@ -308,4 +349,5 @@ public class PulsarCluster {
             "namespaces", "set-deduplication", "public/" + nsName,
             enabled ? "--enable" : "--disable");
     }
+
 }
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
index fd830e5..ceb30b5 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.topologies;
 
+import java.util.Collections;
 import java.util.Map;
 import lombok.Builder;
 import lombok.Builder.Default;
@@ -26,7 +27,6 @@ import lombok.Setter;
 import lombok.experimental.Accessors;
 import org.apache.pulsar.tests.containers.ChaosContainer;
 import org.testcontainers.containers.GenericContainer;
-import org.testng.collections.Maps;
 
 /**
  * Spec to build a pulsar cluster.
@@ -90,13 +90,14 @@ public class PulsarClusterSpec {
      *
      * @return the list of external services to start with the cluster.
      */
-    Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+    Map<String, GenericContainer<?>> externalServices = Collections.EMPTY_MAP;
 
     /**
      * Returns the flag whether to enable/disable container log.
      *
      * @return the flag whether to enable/disable container log.
      */
+    @Default
     boolean enableContainerLog = false;
 
 }
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
index b88dee7..addb04a 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -34,26 +34,37 @@ public abstract class PulsarClusterTestBase {
     @DataProvider(name = "ServiceUrlAndTopics")
     public static Object[][] serviceUrlAndTopics() {
         return new Object[][] {
-            // plain text, persistent topic
-            {
-                pulsarCluster.getPlainTextServiceUrl(),
-                true,
-            },
-            // plain text, non-persistent topic
-            {
-                pulsarCluster.getPlainTextServiceUrl(),
-                false
-            }
+                // plain text, persistent topic
+                {
+                        pulsarCluster.getPlainTextServiceUrl(),
+                        true,
+                },
+                // plain text, non-persistent topic
+                {
+                        pulsarCluster.getPlainTextServiceUrl(),
+                        false
+                }
         };
     }
 
     @DataProvider(name = "ServiceUrls")
     public static Object[][] serviceUrls() {
         return new Object[][] {
-            // plain text
-            {
-                pulsarCluster.getPlainTextServiceUrl()
-            }
+                // plain text
+                {
+                        pulsarCluster.getPlainTextServiceUrl()
+                }
+        };
+    }
+
+    @DataProvider(name = "ServiceAndAdminUrls")
+    public static Object[][] serviceAndAdminUrls() {
+        return new Object[][] {
+                // plain text
+                {
+                        pulsarCluster.getPlainTextServiceUrl(),
+                        pulsarCluster.getHttpServiceUrl()
+                }
         };
     }
 
@@ -66,24 +77,24 @@ public abstract class PulsarClusterTestBase {
 
     public void setupCluster(String namePrefix) throws Exception {
         String clusterName = Stream.of(this.getClass().getSimpleName(), 
namePrefix, randomName(5))
-            .filter(s -> s != null && !s.isEmpty())
-            .collect(joining("-"));
+                .filter(s -> s != null && !s.isEmpty())
+                .collect(joining("-"));
 
         PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder = 
PulsarClusterSpec.builder()
-            .clusterName(clusterName);
+                .clusterName(clusterName);
 
         setupCluster(beforeSetupCluster(clusterName, specBuilder).build());
     }
 
     protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
-        String clusterName,
-        PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+            String clusterName,
+            PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
         return specBuilder;
     }
 
     protected void setupCluster(PulsarClusterSpec spec) throws Exception {
         log.info("Setting up cluster {} with {} bookies, {} brokers",
-            spec.clusterName(), spec.numBookies(), spec.numBrokers());
+                spec.clusterName(), spec.numBookies(), spec.numBrokers());
 
         pulsarCluster = PulsarCluster.forSpec(spec);
         pulsarCluster.start();
@@ -116,11 +127,11 @@ public abstract class PulsarClusterTestBase {
 
     protected static String generateTopicName(String namespace, String 
topicPrefix, boolean isPersistent) {
         String topicName = new StringBuilder(topicPrefix)
-            .append("-")
-            .append(randomName(8))
-            .append("-")
-            .append(System.currentTimeMillis())
-            .toString();
+                .append("-")
+                .append(randomName(8))
+                .append("-")
+                .append(System.currentTimeMillis())
+                .toString();
         if (isPersistent) {
             return "persistent://public/" + namespace + "/" + topicName;
         } else {
diff --git a/tests/integration/compaction/pom.xml 
b/tests/integration/compaction/pom.xml
index c80836f..6ef6f57 100644
--- a/tests/integration/compaction/pom.xml
+++ b/tests/integration/compaction/pom.xml
@@ -38,10 +38,17 @@
   <name>Apache Pulsar :: Tests :: Integration Tests :: Topic Compaction</name>
 
   <dependencies>
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-client</artifactId>
-      <version>${project.version}</version>
-    </dependency>
+      <dependency>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-client</artifactId>
+          <version>${project.version}</version>
+          <scope>test</scope>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.pulsar.tests</groupId>
+          <artifactId>integration-tests-topologies</artifactId>
+          <version>${project.version}</version>
+          <scope>test</scope>
+      </dependency>
   </dependencies>
 </project>
diff --git 
a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
 
b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
index a716f9d..9bee227 100644
--- 
a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
+++ 
b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
@@ -18,53 +18,31 @@
  */
 package org.apache.pulsar.tests.integration;
 
-import com.github.dockerjava.api.DockerClient;
-
-import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
-import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.tests.DockerUtils;
-import org.apache.pulsar.tests.PulsarClusterUtils;
-import org.jboss.arquillian.test.api.ArquillianResource;
-import org.jboss.arquillian.testng.Arquillian;
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Container;
 import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestCompaction extends Arquillian {
-    private static String clusterName = "test";
+import static java.util.stream.Collectors.joining;
 
-    @ArquillianResource
-    DockerClient docker;
+public class TestCompaction extends PulsarClusterTestBase {
 
-    @BeforeMethod
-    public void waitServicesUp() throws Exception {
-        Assert.assertTrue(PulsarClusterUtils.waitZooKeeperUp(docker, 
clusterName, 30, TimeUnit.SECONDS));
-        Assert.assertTrue(PulsarClusterUtils.waitAllBrokersUp(docker, 
clusterName));
-    }
+    @Test(dataProvider = "ServiceUrls")
+    public void testPublishCompactAndConsumeCLI(String serviceUrl) throws 
Exception {
+
+        final String tenant = "compaction-test-cli-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
 
-    @Test
-    public void testPublishCompactAndConsumeCLI() throws Exception {
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                                          PulsarClusterUtils.PULSAR_ADMIN, 
"tenants",
-                                          "create", "compaction-test-cli", 
"--allowed-clusters", clusterName,
-                                          "--admin-roles", "admin");
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
-                "create", "compaction-test-cli/ns1");
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
-                "set-clusters", "--clusters", "test", 
"compaction-test-cli/ns1");
-
-        String brokerIp = DockerUtils.getContainerIP(
-                docker, PulsarClusterUtils.proxySet(docker, 
clusterName).stream().findAny().get());
-        String serviceUrl = "pulsar://" + brokerIp + ":6650";
-        String topic = "persistent://compaction-test-cli/ns1/topic1";
+        this.createNamespace(namespace);
 
         try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build()) {
             
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
@@ -85,9 +63,7 @@ public class TestCompaction extends Arquillian {
                 Assert.assertEquals(m.getData(), "content1".getBytes());
             }
 
-            PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                                              PulsarClusterUtils.PULSAR, 
"compact-topic",
-                                              "-t", topic);
+            pulsarCluster.runPulsarBaseCommandOnAnyBroker("compact-topic", 
"-t", topic);
 
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) 
{
@@ -98,23 +74,19 @@ public class TestCompaction extends Arquillian {
         }
     }
 
-    @Test
-    public void testPublishCompactAndConsumeRest() throws Exception {
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                                          PulsarClusterUtils.PULSAR_ADMIN, 
"tenants",
-                                          "create", "compaction-test-rest", 
"--allowed-clusters", clusterName,
-                                          "--admin-roles", "admin");
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
-                "create", "compaction-test-rest/ns1");
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
-                "set-clusters", "--clusters", "test", 
"compaction-test-rest/ns1");
-
-        String brokerIp = DockerUtils.getContainerIP(
-                docker, PulsarClusterUtils.proxySet(docker, 
clusterName).stream().findAny().get());
-        String serviceUrl = "pulsar://" + brokerIp + ":6650";
-        String topic = "persistent://compaction-test-rest/ns1/topic1";
+    @Test(dataProvider = "ServiceUrls")
+    public void testPublishCompactAndConsumeRest(String serviceUrl) throws 
Exception {
+
+        final String tenant = "compaction-test-rest-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
+
+        this.createNamespace(namespace);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-clusters", "--clusters", pulsarCluster.getClusterName(), 
namespace);
 
         try (PulsarClient client = PulsarClient.create(serviceUrl)) {
             
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
@@ -134,12 +106,11 @@ public class TestCompaction extends Arquillian {
                 Assert.assertEquals(m.getKey(), "key0");
                 Assert.assertEquals(m.getData(), "content1".getBytes());
             }
-            PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                    PulsarClusterUtils.PULSAR_ADMIN, "persistent", "compact", 
topic);
+            pulsarCluster.runAdminCommandOnAnyBroker("persistent",
+                    "compact", topic);
 
-            PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                                              "/pulsar/bin/pulsar-admin", 
"persistent", "compaction-status",
-                                              "-w", topic);
+            pulsarCluster.runAdminCommandOnAnyBroker("persistent",
+                "compaction-status", "-w", topic);
 
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) 
{
@@ -171,24 +142,19 @@ public class TestCompaction extends Arquillian {
         }
     }
 
-    @Test
-    public void testPublishWithAutoCompaction() throws Exception {
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                                          PulsarClusterUtils.PULSAR_ADMIN, 
"tenants",
-                                          "create", "compaction-test-auto",
-                                          "--allowed-clusters", clusterName,
-                                          "--admin-roles", "admin");
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
-                "create", "--clusters", "test", "compaction-test-auto/ns1");
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
-                "set-compaction-threshold", "--threshold", "1", 
"compaction-test-auto/ns1");
-
-        String brokerIp = DockerUtils.getContainerIP(
-                docker, PulsarClusterUtils.proxySet(docker, 
clusterName).stream().findAny().get());
-        String serviceUrl = "pulsar://" + brokerIp + ":6650";
-        String topic = "persistent://compaction-test-auto/ns1/topic1";
+    @Test(dataProvider = "ServiceUrls")
+    public void testPublishWithAutoCompaction(String serviceUrl) throws 
Exception {
+
+        final String tenant = "compaction-test-auto-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
+
+        this.createNamespace(namespace);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-compaction-threshold", "--threshold", "1", namespace);
 
         try (PulsarClient client = PulsarClient.create(serviceUrl)) {
             
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
@@ -207,4 +173,20 @@ public class TestCompaction extends Arquillian {
         }
     }
 
+    private Container.ExecResult createTenantName(final String tenantName,
+                                                  final String 
allowedClusterName,
+                                                  final String adminRoleName) 
throws Exception {
+        return pulsarCluster.runAdminCommandOnAnyBroker(
+            "tenants", "create", "--allowed-clusters", allowedClusterName,
+            "--admin-roles", adminRoleName, tenantName);
+    }
+
+    private Container.ExecResult createNamespace(final String Ns) throws 
Exception {
+        return pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces",
+                "create",
+                "--clusters",
+                pulsarCluster.getClusterName(), Ns);
+    }
+
 }
diff --git a/tests/integration/compaction/src/test/resources/arquillian.xml 
b/tests/integration/compaction/src/test/resources/arquillian.xml
deleted file mode 100644
index 821d6e6..0000000
--- a/tests/integration/compaction/src/test/resources/arquillian.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<arquillian xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-            xmlns="http://jboss.org/schema/arquillian";
-            xsi:schemaLocation="http://jboss.org/schema/arquillian
-                                
http://jboss.org/schema/arquillian/arquillian_1_0.xsd";>
-
-  <extension qualifier="docker">
-    <property name="definitionFormat">CUBE</property>
-    <property 
name="dockerContainersResource">cube-definitions/single-cluster-3-bookie-2-broker.yaml</property>
-  </extension>
-
-</arquillian>
diff --git a/tests/integration/s3-offload/pom.xml 
b/tests/integration/s3-offload/pom.xml
index c32a996..2dbe1bb 100644
--- a/tests/integration/s3-offload/pom.xml
+++ b/tests/integration/s3-offload/pom.xml
@@ -40,11 +40,6 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-client-original</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-client-admin-original</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -53,5 +48,11 @@
       <artifactId>managed-ledger-original</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+        <groupId>org.apache.pulsar.tests</groupId>
+        <artifactId>integration-tests-topologies</artifactId>
+        <version>${project.version}</version>
+        <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
 
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
index 4248976..0e8f48f 100644
--- 
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
+++ 
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pulsar.tests.integration;
 
-import com.github.dockerjava.api.DockerClient;
-import com.google.common.collect.ImmutableMap;
-
+import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
+import com.google.common.collect.ImmutableMap;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 
@@ -33,58 +36,61 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 
-import org.apache.pulsar.tests.DockerUtils;
-import org.apache.pulsar.tests.PulsarClusterUtils;
-import org.jboss.arquillian.test.api.ArquillianResource;
-import org.jboss.arquillian.testng.Arquillian;
+import org.apache.pulsar.tests.containers.BrokerContainer;
+import org.apache.pulsar.tests.containers.S3Container;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import org.apache.pulsar.tests.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import static java.util.stream.Collectors.joining;
 
-public class TestS3Offload extends Arquillian {
-    private static final Logger log = 
LoggerFactory.getLogger(TestS3Offload.class);
+@Slf4j
+public class TestS3Offload extends PulsarClusterTestBase {
 
-    private static final String CLUSTER_NAME = "test";
     private static final int ENTRY_SIZE = 1024;
     private static final int ENTRIES_PER_LEDGER = 1024;
 
-    @ArquillianResource
-    DockerClient docker;
-
-    @BeforeMethod
-    public void configureAndStartBrokers() throws Exception {
-
-        String s3ip = DockerUtils.cubeIdsWithLabels(
-                docker, ImmutableMap.of("service", "s3", "cluster", 
CLUSTER_NAME))
-            .stream().map((c) -> DockerUtils.getContainerIP(docker, 
c)).findFirst().get();
-
-        String brokerConfFile = "/pulsar/conf/broker.conf";
-        for (String b : PulsarClusterUtils.brokerSet(docker, CLUSTER_NAME)) {
-            PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
-                    "managedLedgerMaxEntriesPerLedger", 
String.valueOf(ENTRIES_PER_LEDGER));
-            PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
-                    "managedLedgerMinLedgerRolloverTimeMinutes", "0");
-            PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
-                    "managedLedgerOffloadDriver", "s3");
-            PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
-                    "s3ManagedLedgerOffloadBucket", "pulsar-integtest");
-            PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
-                    "s3ManagedLedgerOffloadServiceEndpoint", "http://"; + s3ip 
+ ":9090");
+    @Override
+    @BeforeClass
+    public void setupCluster() throws Exception {
+
+        final String clusterName = Stream.of(this.getClass().getSimpleName(), 
randomName(5))
+                .filter(s -> s != null && !s.isEmpty())
+                .collect(joining("-"));
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+            .numBookies(2)
+            .numBrokers(1)
+            .externalServices(ImmutableMap.of(S3Container.NAME, new 
S3Container(clusterName, S3Container.NAME)))
+            .clusterName(clusterName)
+            .build();
+
+        log.info("Setting up cluster {} with {} bookies, {} brokers",
+                spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+
+        for(BrokerContainer brokerContainer : pulsarCluster.getBrokers()){
+            brokerContainer.withEnv("managedLedgerMaxEntriesPerLedger", 
String.valueOf(ENTRIES_PER_LEDGER));
+            
brokerContainer.withEnv("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+            brokerContainer.withEnv("managedLedgerOffloadDriver", "s3");
+            brokerContainer.withEnv("s3ManagedLedgerOffloadBucket", 
"pulsar-integtest");
+            brokerContainer.withEnv("s3ManagedLedgerOffloadServiceEndpoint", 
"http://"; + S3Container.NAME + ":9090");
         }
 
-        Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker, 
CLUSTER_NAME));
-        Assert.assertTrue(PulsarClusterUtils.startAllProxies(docker, 
CLUSTER_NAME));
-    }
+        pulsarCluster.start();
 
-    @AfterMethod
-    public void teardownBrokers() throws Exception {
-        PulsarClusterUtils.stopAllProxies(docker, CLUSTER_NAME);
-        Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker, 
CLUSTER_NAME));
+        log.info("Cluster {} is setup", spec.clusterName());
     }
 
     private static byte[] buildEntry(String pattern) {
@@ -97,165 +103,151 @@ public class TestS3Offload extends Arquillian {
         return entry;
     }
 
-    @Test
-    public void testPublishOffloadAndConsumeViaCLI() throws Exception {
-        final String TENANT = "s3-offload-test-cli";
-        final String NAMESPACE = "s3-offload-test-cli/ns1";
-        final String TOPIC = "persistent://s3-offload-test-cli/ns1/topic1";
-
-        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
-                "/pulsar/bin/pulsar-admin", "tenants",
-                "create", "--allowed-clusters", CLUSTER_NAME,
-                "--admin-roles", "offload-admin", TENANT);
-        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
-                "/pulsar/bin/pulsar-admin", "namespaces",
-                "create", "--clusters", CLUSTER_NAME, NAMESPACE);
-
-        String broker = PulsarClusterUtils.brokerSet(docker, 
CLUSTER_NAME).stream().findFirst().get();
-        String proxyIp = PulsarClusterUtils.proxySet(docker, CLUSTER_NAME)
-            .stream().map((c) -> DockerUtils.getContainerIP(docker, 
c)).findFirst().get();
-        String serviceUrl = "pulsar://" + proxyIp + ":6650";
-        String adminUrl = "http://"; + proxyIp + ":8080";
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String 
adminUrl) throws Exception {
+        final String tenant = "s3-offload-test-cli-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        pulsarCluster.runAdminCommandOnAnyBroker( "tenants",
+            "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+            "--admin-roles", "offload-admin", tenant);
+
+        pulsarCluster.runAdminCommandOnAnyBroker(
+             "namespaces",
+            "create", "--clusters", pulsarCluster.getClusterName(), namespace);
 
         long firstLedger = -1;
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Producer producer = client.newProducer().topic(TOPIC)
+            Producer producer = client.newProducer().topic(topic)
                 .blockIfQueueFull(true).enableBatching(false).create();
             PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
-            
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe().close();
+            
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
 
             // write enough to topic to make it roll
             int i = 0;
-            for (; i < ENTRIES_PER_LEDGER*1.5; i++) {
-                producer.sendAsync(buildEntry("offload-message"+i));
+            for (; i < ENTRIES_PER_LEDGER * 1.5; i++) {
+                producer.sendAsync(buildEntry("offload-message" + i));
             }
-            MessageId latestMessage = 
producer.send(buildEntry("offload-message"+i));
+            MessageId latestMessage = 
producer.send(buildEntry("offload-message" + i));
 
             // read managed ledger info, check ledgers exist
-            firstLedger = 
admin.topics().getInternalStats(TOPIC).ledgers.get(0).ledgerId;
+            firstLedger = 
admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
 
             // first offload with a high threshold, nothing should offload
-            String output = DockerUtils.runCommand(docker, broker,
-                    "/pulsar/bin/pulsar-admin", "topics",
-                    "offload", "--size-threshold", "100G", TOPIC);
+
+            String output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                    "offload", "--size-threshold", "100G", topic).getStdout();
             Assert.assertTrue(output.contains("Nothing to offload"));
 
-            output = DockerUtils.runCommand(docker, broker,
-                    "/pulsar/bin/pulsar-admin", "topics", "offload-status", 
TOPIC);
+            output = pulsarCluster.runAdminCommandOnAnyBroker( "topics",
+                "offload-status", topic).getStdout();
             Assert.assertTrue(output.contains("Offload has not been run"));
 
             // offload with a low threshold
-            output = DockerUtils.runCommand(docker, broker,
-                    "/pulsar/bin/pulsar-admin", "topics",
-                    "offload", "--size-threshold", "1M", TOPIC);
+            output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                    "offload", "--size-threshold", "1M", topic).getStdout();
             Assert.assertTrue(output.contains("Offload triggered"));
 
-            output = DockerUtils.runCommand(docker, broker,
-                    "/pulsar/bin/pulsar-admin", "topics", "offload-status", 
"-w", TOPIC);
+            output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                "offload-status", "-w", topic).getStdout();
             Assert.assertTrue(output.contains("Offload was a success"));
         }
 
         // stop brokers to clear all caches, open handles, etc
-        Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker, 
CLUSTER_NAME));
+        pulsarCluster.stopAllBrokers();
 
         // delete the first ledger, so that we cannot possibly read from it
         ClientConfiguration bkConf = new ClientConfiguration();
-        bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker, 
CLUSTER_NAME));
+        bkConf.setZkServers(pulsarCluster.getZKConnString());
         try (BookKeeper bk = new BookKeeper(bkConf)) {
             bk.deleteLedger(firstLedger);
         }
 
         // start all brokers again
-        Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker, 
CLUSTER_NAME));
+        pulsarCluster.startAllBrokers();
 
         log.info("Read back the data (which would be in that first ledger)");
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Consumer consumer = 
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe()) {
+            Consumer consumer = 
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
             // read back from topic
-            for (int i = 0; i < ENTRIES_PER_LEDGER*1.5; i++) {
+            for (int i = 0; i < ENTRIES_PER_LEDGER * 1.5; i++) {
                 Message m = consumer.receive(1, TimeUnit.MINUTES);
-                Assert.assertEquals(buildEntry("offload-message"+i), 
m.getData());
+                Assert.assertEquals(buildEntry("offload-message" + i), 
m.getData());
             }
         }
     }
 
-    @Test
-    public void testPublishOffloadAndConsumeViaThreshold() throws Exception {
-        final String TENANT = "s3-offload-test-threshold";
-        final String NAMESPACE = "s3-offload-test-threshold/ns1";
-        final String TOPIC = 
"persistent://s3-offload-test-threshold/ns1/topic1";
-
-        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
-                "/pulsar/bin/pulsar-admin", "tenants",
-                "create", "--allowed-clusters", CLUSTER_NAME,
-                "--admin-roles", "offload-admin", TENANT);
-        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
-                "/pulsar/bin/pulsar-admin", "namespaces",
-                "create", "--clusters", CLUSTER_NAME, NAMESPACE);
-        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
-                "/pulsar/bin/pulsar-admin", "namespaces",
-                "set-offload-threshold", "--size", "1M", NAMESPACE);
-
-        String proxyIp  = PulsarClusterUtils.proxySet(docker, CLUSTER_NAME)
-            .stream().map((c) -> DockerUtils.getContainerIP(docker, 
c)).findFirst().get();
-        String serviceUrl = "pulsar://" + proxyIp + ":6650";
-        String adminUrl = "http://"; + proxyIp + ":8080";
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, 
String adminUrl) throws Exception {
+        final String tenant = "s3-offload-test-threshold-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        pulsarCluster.runAdminCommandOnAnyBroker("tenants",
+                "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+                "--admin-roles", "offload-admin", tenant);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "create", "--clusters", pulsarCluster.getClusterName(), 
namespace);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-offload-threshold", "--size", "1M", namespace);
 
         long firstLedger = 0;
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Producer producer = client.newProducer().topic(TOPIC)
+            Producer producer = client.newProducer().topic(topic)
                 .blockIfQueueFull(true).enableBatching(false).create();
             PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
 
-            
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe().close();
+            
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
 
             // write enough to topic to make it roll twice
-            for (int i = 0; i < ENTRIES_PER_LEDGER*2.5; i++) {
-                producer.sendAsync(buildEntry("offload-message"+i));
+            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+                producer.sendAsync(buildEntry("offload-message" + i));
             }
             producer.send(buildEntry("final-offload-message"));
 
-            firstLedger = 
admin.topics().getInternalStats(TOPIC).ledgers.get(0).ledgerId;
+            firstLedger = 
admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
 
             // wait up to 30 seconds for offload to occur
-            for (int i = 0; i < 300 && 
!admin.topics().getInternalStats(TOPIC).ledgers.get(0).offloaded; i++) {
+            for (int i = 0; i < 300 && 
!admin.topics().getInternalStats(topic).ledgers.get(0).offloaded; i++) {
                 Thread.sleep(100);
             }
-            
Assert.assertTrue(admin.topics().getInternalStats(TOPIC).ledgers.get(0).offloaded);
+            
Assert.assertTrue(admin.topics().getInternalStats(topic).ledgers.get(0).offloaded);
         }
 
         // stop brokers to clear all caches, open handles, etc
-        Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker, 
CLUSTER_NAME));
+        pulsarCluster.stopAllBrokers();
 
         // delete the first ledger, so that we cannot possibly read from it
         ClientConfiguration bkConf = new ClientConfiguration();
-        bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker, 
CLUSTER_NAME));
+        bkConf.setZkServers(pulsarCluster.getZKConnString());
         try (BookKeeper bk = new BookKeeper(bkConf)) {
             bk.deleteLedger(firstLedger);
         }
 
         // start all brokers again
-        Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker, 
CLUSTER_NAME));
+        pulsarCluster.startAllBrokers();
 
         log.info("Read back the data (which would be in that first ledger)");
         try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-             Consumer consumer = 
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe()) {
+             Consumer consumer = 
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
             // read back from topic
-            for (int i = 0; i < ENTRIES_PER_LEDGER*2.5; i++) {
+            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
                 Message m = consumer.receive(1, TimeUnit.MINUTES);
-                Assert.assertEquals(buildEntry("offload-message"+i), 
m.getData());
+                Assert.assertEquals(buildEntry("offload-message" + i), 
m.getData());
             }
         }
 
         // try disabling
-        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
-                "/pulsar/bin/pulsar-admin", "namespaces",
-                "set-offload-threshold", "--size", "-1", NAMESPACE);
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-offload-threshold", "--size", "-1", namespace);
 
         // hard to validate that it has been disabled as we'd be waiting for
         // something _not_ to happen (i.e. waiting for ages), so just check
         try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
-            
Assert.assertEquals(admin.namespaces().getOffloadThreshold(NAMESPACE), -1L);
+            
Assert.assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1L);
         }
     }
 }
diff --git a/tests/integration/s3-offload/src/test/resources/arquillian.xml 
b/tests/integration/s3-offload/src/test/resources/arquillian.xml
deleted file mode 100644
index 14714d2..0000000
--- a/tests/integration/s3-offload/src/test/resources/arquillian.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<arquillian xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-            xmlns="http://jboss.org/schema/arquillian";
-            xsi:schemaLocation="http://jboss.org/schema/arquillian
-                                
http://jboss.org/schema/arquillian/arquillian_1_0.xsd";>
-
-  <extension qualifier="docker">
-    <property name="definitionFormat">CUBE</property>
-    <property 
name="dockerContainersResource">cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml</property>
-  </extension>
-
-</arquillian>

Reply via email to