sijie closed pull request #2101: Migrate compaction and s3 offload test to 
testcontainers
URL: https://github.com/apache/incubator-pulsar/pull/2101
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/S3Container.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/S3Container.java
new file mode 100644
index 0000000000..9efee214c9
--- /dev/null
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/S3Container.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * S3 simulation container
+ */
+@Slf4j
+public class S3Container extends ChaosContainer<S3Container> {
+
+    public static final String NAME = "s3";
+    private static final String IMAGE_NAME = "apachepulsar/s3mock:latest";
+    private final String hostname;
+
+    public S3Container(String clusterName, String hostname) {
+        super(clusterName, IMAGE_NAME);
+        this.hostname = hostname;
+        this.withEnv("initialBuckets", "pulsar-integtest");
+    }
+
+    @Override
+    public String getContainerName() {
+        return clusterName + "-" + hostname;
+    }
+
+    @Override
+    public void start() {
+        this.withCreateContainerCmdModifier(createContainerCmd -> {
+            createContainerCmd.withHostName(hostname);
+            createContainerCmd.withName(getContainerName());
+        });
+
+        super.start();
+        log.info("Start s3 service");
+    }
+}
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 040d59dad1..0f0bcffec5 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -20,6 +20,7 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT;
+import static org.apache.pulsar.tests.containers.PulsarContainer.ZK_PORT;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -32,6 +33,7 @@
 import java.util.function.Function;
 import java.util.stream.Stream;
 
+import com.google.common.collect.Streams;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.containers.BKContainer;
@@ -53,9 +55,10 @@
 
     public static final String ADMIN_SCRIPT = "/pulsar/bin/pulsar-admin";
     public static final String CLIENT_SCRIPT = "/pulsar/bin/pulsar-client";
+    public static final String PULSAR_COMMAND_SCRIPT = "/pulsar/bin/pulsar";
 
     /**
-     * Pulsar Cluster Spec.
+     * Pulsar Cluster Spec
      *
      * @param spec pulsar cluster spec.
      * @return the built pulsar cluster
@@ -106,6 +109,31 @@ private PulsarCluster(PulsarClusterSpec spec) {
             .withEnv("zookeeperServers", ZKContainer.NAME)
             .withEnv("configurationStoreServers", CSContainer.NAME + ":" + 
CS_PORT)
             .withEnv("clusterName", clusterName);
+
+        // create bookies
+        bookieContainers.putAll(
+                runNumContainers("bookie", spec.numBookies(), (name) -> new 
BKContainer(clusterName, name)
+                        .withNetwork(network)
+                        .withNetworkAliases(name)
+                        .withEnv("zkServers", ZKContainer.NAME)
+                        .withEnv("useHostNameAsBookieID", "true")
+                        .withEnv("clusterName", clusterName)
+                )
+        );
+
+        // create brokers
+        brokerContainers.putAll(
+                runNumContainers("broker", spec.numBrokers(), (name) -> new 
BrokerContainer(clusterName, name)
+                        .withNetwork(network)
+                        .withNetworkAliases(name)
+                        .withEnv("zkServers", ZKContainer.NAME)
+                        .withEnv("zookeeperServers", ZKContainer.NAME)
+                        .withEnv("configurationStoreServers", CSContainer.NAME 
+ ":" + CS_PORT)
+                        .withEnv("clusterName", clusterName)
+                        
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
+                )
+        );
+
     }
 
     public String getPlainTextServiceUrl() {
@@ -116,6 +144,10 @@ public String getHttpServiceUrl() {
         return proxyContainer.getHttpServiceUrl();
     }
 
+    public String getZKConnString() {
+        return zkContainer.getContainerIpAddress() + ":" + 
zkContainer.getMappedPort(ZK_PORT);
+    }
+
     public void start() throws Exception {
         // start the local zookeeper
         zkContainer.start();
@@ -130,29 +162,13 @@ public void start() throws Exception {
             "bin/init-cluster.sh");
         log.info("Successfully initialized the cluster.");
 
-        // create bookies
-        bookieContainers.putAll(
-            runNumContainers("bookie", spec.numBookies(), (name) -> new 
BKContainer(clusterName, name)
-                .withNetwork(network)
-                .withNetworkAliases(name)
-                .withEnv("zkServers", ZKContainer.NAME)
-                .withEnv("useHostNameAsBookieID", "true")
-                .withEnv("clusterName", clusterName)
-            )
-        );
+        // start bookies
+        bookieContainers.values().forEach(BKContainer::start);
+        log.info("Successfully started {} bookie conntainers.", 
bookieContainers.size());
 
-        // create brokers
-        brokerContainers.putAll(
-            runNumContainers("broker", spec.numBrokers(), (name) -> new 
BrokerContainer(clusterName, name)
-                .withNetwork(network)
-                .withNetworkAliases(name)
-                .withEnv("zkServers", ZKContainer.NAME)
-                .withEnv("zookeeperServers", ZKContainer.NAME)
-                .withEnv("configurationStoreServers", CSContainer.NAME + ":" + 
CS_PORT)
-                .withEnv("clusterName", clusterName)
-                .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", 
"1")
-            )
-        );
+        // start brokers
+        this.startAllBrokers();
+        log.info("Successfully started {} broker conntainers.", 
brokerContainers.size());
 
         // create proxy
         proxyContainer.start();
@@ -175,12 +191,12 @@ public void start() throws Exception {
         }
 
         // start external services
-        Map<String, GenericContainer<?>> externalServices = 
spec.externalServices;
+        final Map<String, GenericContainer<?>> externalServices = 
spec.externalServices;
         if (null != externalServices) {
-            externalServices.entrySet().forEach(service -> {
-                GenericContainer<?> serviceContainer = service.getValue();
-                serviceContainer.withNetwork(network);
-                serviceContainer.start();
+            externalServices.entrySet().parallelStream().forEach(service -> {
+                service.getValue().withNetwork(network);
+                service.getValue().withNetworkAliases(service.getKey());
+                service.getValue().start();
                 log.info("Successfully start external service {}.", 
service.getKey());
             });
         }
@@ -195,24 +211,25 @@ public void start() throws Exception {
             String name = "pulsar-" + serviceName + "-" + i;
             T container = containerCreator.apply(name);
             containers.put(name, container);
-            startFutures.add(CompletableFuture.runAsync(() -> 
container.start()));
         }
-        CompletableFuture.allOf(startFutures.toArray(new 
CompletableFuture[startFutures.size()])).join();
-        log.info("Successfully started {} {} containers", numContainers, 
serviceName);
         return containers;
     }
 
     public void stop() {
-        Stream.of(proxyContainer, csContainer, 
zkContainer).parallel().forEach(GenericContainer::stop);
-        
workerContainers.values().parallelStream().forEach(GenericContainer::stop);
-        
brokerContainers.values().parallelStream().forEach(GenericContainer::stop);
-        
bookieContainers.values().parallelStream().forEach(GenericContainer::stop);
-        if (null != spec.externalServices()) {
-            spec.externalServices().values()
-                .parallelStream()
-                .forEach(GenericContainer::stop);
+
+        Stream<GenericContainer> containers = Streams.concat(
+                workerContainers.values().stream(),
+                brokerContainers.values().stream(),
+                bookieContainers.values().stream(),
+                Stream.of(proxyContainer, csContainer, zkContainer)
+        );
+
+        if (spec.externalServices() != null) {
+            containers = Streams.concat(containers, 
spec.externalServices().values().stream());
         }
 
+        containers.parallel().forEach(GenericContainer::stop);
+
         try {
             network.close();
         } catch (Exception e) {
@@ -242,6 +259,7 @@ private void 
startFunctionWorkersWithProcessContainerFactory(int numFunctionWork
                 // bookkeeper tools
                 .withEnv("zkServers", ZKContainer.NAME)
         ));
+        this.startWorkers();
     }
 
     private void startFunctionWorkersWithThreadContainerFactory(int 
numFunctionWorkers) {
@@ -267,6 +285,13 @@ private void 
startFunctionWorkersWithThreadContainerFactory(int numFunctionWorke
                 // bookkeeper tools
                 .withEnv("zkServers", ZKContainer.NAME)
         ));
+        this.startWorkers();
+    }
+
+    private void startWorkers() {
+        // Start workers that have been initialized
+        
workerContainers.values().parallelStream().forEach(WorkerContainer::start);
+        log.info("Successfully started {} worker conntainers.", 
workerContainers.size());
     }
 
     public BrokerContainer getAnyBroker() {
@@ -290,13 +315,29 @@ public WorkerContainer getAnyWorker() {
     }
 
     public ExecResult runAdminCommandOnAnyBroker(String...commands) throws 
Exception {
+        return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
+    }
+
+    public ExecResult runPulsarBaseCommandOnAnyBroker(String...commands) 
throws Exception {
+        return runCommandOnAnyBrokerWithScript(PULSAR_COMMAND_SCRIPT, 
commands);
+    }
+
+    private ExecResult runCommandOnAnyBrokerWithScript(String scriptType, 
String...commands) throws Exception {
         BrokerContainer container = getAnyBroker();
         String[] cmds = new String[commands.length + 1];
-        cmds[0] = ADMIN_SCRIPT;
+        cmds[0] = scriptType;
         System.arraycopy(commands, 0, cmds, 1, commands.length);
         return container.execCmd(cmds);
     }
 
+    public void stopAllBrokers() {
+        brokerContainers.values().forEach(BrokerContainer::stop);
+    }
+
+    public void startAllBrokers() {
+        brokerContainers.values().forEach(BrokerContainer::start);
+    }
+
     public ExecResult createNamespace(String nsName) throws Exception {
         return runAdminCommandOnAnyBroker(
             "namespaces", "create", "public/" + nsName,
@@ -308,4 +349,5 @@ public ExecResult enableDeduplication(String nsName, 
boolean enabled) throws Exc
             "namespaces", "set-deduplication", "public/" + nsName,
             enabled ? "--enable" : "--disable");
     }
+
 }
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
index fd830e56c1..ceb30b54cc 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.topologies;
 
+import java.util.Collections;
 import java.util.Map;
 import lombok.Builder;
 import lombok.Builder.Default;
@@ -26,7 +27,6 @@
 import lombok.experimental.Accessors;
 import org.apache.pulsar.tests.containers.ChaosContainer;
 import org.testcontainers.containers.GenericContainer;
-import org.testng.collections.Maps;
 
 /**
  * Spec to build a pulsar cluster.
@@ -90,13 +90,14 @@
      *
      * @return the list of external services to start with the cluster.
      */
-    Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+    Map<String, GenericContainer<?>> externalServices = Collections.EMPTY_MAP;
 
     /**
      * Returns the flag whether to enable/disable container log.
      *
      * @return the flag whether to enable/disable container log.
      */
+    @Default
     boolean enableContainerLog = false;
 
 }
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
index b88dee768b..addb04a2df 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -34,26 +34,37 @@
     @DataProvider(name = "ServiceUrlAndTopics")
     public static Object[][] serviceUrlAndTopics() {
         return new Object[][] {
-            // plain text, persistent topic
-            {
-                pulsarCluster.getPlainTextServiceUrl(),
-                true,
-            },
-            // plain text, non-persistent topic
-            {
-                pulsarCluster.getPlainTextServiceUrl(),
-                false
-            }
+                // plain text, persistent topic
+                {
+                        pulsarCluster.getPlainTextServiceUrl(),
+                        true,
+                },
+                // plain text, non-persistent topic
+                {
+                        pulsarCluster.getPlainTextServiceUrl(),
+                        false
+                }
         };
     }
 
     @DataProvider(name = "ServiceUrls")
     public static Object[][] serviceUrls() {
         return new Object[][] {
-            // plain text
-            {
-                pulsarCluster.getPlainTextServiceUrl()
-            }
+                // plain text
+                {
+                        pulsarCluster.getPlainTextServiceUrl()
+                }
+        };
+    }
+
+    @DataProvider(name = "ServiceAndAdminUrls")
+    public static Object[][] serviceAndAdminUrls() {
+        return new Object[][] {
+                // plain text
+                {
+                        pulsarCluster.getPlainTextServiceUrl(),
+                        pulsarCluster.getHttpServiceUrl()
+                }
         };
     }
 
@@ -66,24 +77,24 @@ public void setupCluster() throws Exception {
 
     public void setupCluster(String namePrefix) throws Exception {
         String clusterName = Stream.of(this.getClass().getSimpleName(), 
namePrefix, randomName(5))
-            .filter(s -> s != null && !s.isEmpty())
-            .collect(joining("-"));
+                .filter(s -> s != null && !s.isEmpty())
+                .collect(joining("-"));
 
         PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder = 
PulsarClusterSpec.builder()
-            .clusterName(clusterName);
+                .clusterName(clusterName);
 
         setupCluster(beforeSetupCluster(clusterName, specBuilder).build());
     }
 
     protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
-        String clusterName,
-        PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+            String clusterName,
+            PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
         return specBuilder;
     }
 
     protected void setupCluster(PulsarClusterSpec spec) throws Exception {
         log.info("Setting up cluster {} with {} bookies, {} brokers",
-            spec.clusterName(), spec.numBookies(), spec.numBrokers());
+                spec.clusterName(), spec.numBookies(), spec.numBrokers());
 
         pulsarCluster = PulsarCluster.forSpec(spec);
         pulsarCluster.start();
@@ -116,11 +127,11 @@ protected static String generateTopicName(String 
topicPrefix, boolean isPersiste
 
     protected static String generateTopicName(String namespace, String 
topicPrefix, boolean isPersistent) {
         String topicName = new StringBuilder(topicPrefix)
-            .append("-")
-            .append(randomName(8))
-            .append("-")
-            .append(System.currentTimeMillis())
-            .toString();
+                .append("-")
+                .append(randomName(8))
+                .append("-")
+                .append(System.currentTimeMillis())
+                .toString();
         if (isPersistent) {
             return "persistent://public/" + namespace + "/" + topicName;
         } else {
diff --git a/tests/integration/compaction/pom.xml 
b/tests/integration/compaction/pom.xml
index c80836f8c6..6ef6f576c1 100644
--- a/tests/integration/compaction/pom.xml
+++ b/tests/integration/compaction/pom.xml
@@ -38,10 +38,17 @@
   <name>Apache Pulsar :: Tests :: Integration Tests :: Topic Compaction</name>
 
   <dependencies>
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-client</artifactId>
-      <version>${project.version}</version>
-    </dependency>
+      <dependency>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-client</artifactId>
+          <version>${project.version}</version>
+          <scope>test</scope>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.pulsar.tests</groupId>
+          <artifactId>integration-tests-topologies</artifactId>
+          <version>${project.version}</version>
+          <scope>test</scope>
+      </dependency>
   </dependencies>
 </project>
diff --git 
a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
 
b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
index a716f9d8fd..9bee2273cf 100644
--- 
a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
+++ 
b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
@@ -18,53 +18,31 @@
  */
 package org.apache.pulsar.tests.integration;
 
-import com.github.dockerjava.api.DockerClient;
-
-import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
-import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.tests.DockerUtils;
-import org.apache.pulsar.tests.PulsarClusterUtils;
-import org.jboss.arquillian.test.api.ArquillianResource;
-import org.jboss.arquillian.testng.Arquillian;
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Container;
 import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestCompaction extends Arquillian {
-    private static String clusterName = "test";
+import static java.util.stream.Collectors.joining;
 
-    @ArquillianResource
-    DockerClient docker;
+public class TestCompaction extends PulsarClusterTestBase {
 
-    @BeforeMethod
-    public void waitServicesUp() throws Exception {
-        Assert.assertTrue(PulsarClusterUtils.waitZooKeeperUp(docker, 
clusterName, 30, TimeUnit.SECONDS));
-        Assert.assertTrue(PulsarClusterUtils.waitAllBrokersUp(docker, 
clusterName));
-    }
+    @Test(dataProvider = "ServiceUrls")
+    public void testPublishCompactAndConsumeCLI(String serviceUrl) throws 
Exception {
+
+        final String tenant = "compaction-test-cli-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
 
-    @Test
-    public void testPublishCompactAndConsumeCLI() throws Exception {
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                                          PulsarClusterUtils.PULSAR_ADMIN, 
"tenants",
-                                          "create", "compaction-test-cli", 
"--allowed-clusters", clusterName,
-                                          "--admin-roles", "admin");
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
-                "create", "compaction-test-cli/ns1");
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
-                "set-clusters", "--clusters", "test", 
"compaction-test-cli/ns1");
-
-        String brokerIp = DockerUtils.getContainerIP(
-                docker, PulsarClusterUtils.proxySet(docker, 
clusterName).stream().findAny().get());
-        String serviceUrl = "pulsar://" + brokerIp + ":6650";
-        String topic = "persistent://compaction-test-cli/ns1/topic1";
+        this.createNamespace(namespace);
 
         try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build()) {
             
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
@@ -85,9 +63,7 @@ public void testPublishCompactAndConsumeCLI() throws 
Exception {
                 Assert.assertEquals(m.getData(), "content1".getBytes());
             }
 
-            PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                                              PulsarClusterUtils.PULSAR, 
"compact-topic",
-                                              "-t", topic);
+            pulsarCluster.runPulsarBaseCommandOnAnyBroker("compact-topic", 
"-t", topic);
 
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) 
{
@@ -98,23 +74,19 @@ public void testPublishCompactAndConsumeCLI() throws 
Exception {
         }
     }
 
-    @Test
-    public void testPublishCompactAndConsumeRest() throws Exception {
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                                          PulsarClusterUtils.PULSAR_ADMIN, 
"tenants",
-                                          "create", "compaction-test-rest", 
"--allowed-clusters", clusterName,
-                                          "--admin-roles", "admin");
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
-                "create", "compaction-test-rest/ns1");
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
-                "set-clusters", "--clusters", "test", 
"compaction-test-rest/ns1");
-
-        String brokerIp = DockerUtils.getContainerIP(
-                docker, PulsarClusterUtils.proxySet(docker, 
clusterName).stream().findAny().get());
-        String serviceUrl = "pulsar://" + brokerIp + ":6650";
-        String topic = "persistent://compaction-test-rest/ns1/topic1";
+    @Test(dataProvider = "ServiceUrls")
+    public void testPublishCompactAndConsumeRest(String serviceUrl) throws 
Exception {
+
+        final String tenant = "compaction-test-rest-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
+
+        this.createNamespace(namespace);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-clusters", "--clusters", pulsarCluster.getClusterName(), 
namespace);
 
         try (PulsarClient client = PulsarClient.create(serviceUrl)) {
             
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
@@ -134,12 +106,11 @@ public void testPublishCompactAndConsumeRest() throws 
Exception {
                 Assert.assertEquals(m.getKey(), "key0");
                 Assert.assertEquals(m.getData(), "content1".getBytes());
             }
-            PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                    PulsarClusterUtils.PULSAR_ADMIN, "persistent", "compact", 
topic);
+            pulsarCluster.runAdminCommandOnAnyBroker("persistent",
+                    "compact", topic);
 
-            PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                                              "/pulsar/bin/pulsar-admin", 
"persistent", "compaction-status",
-                                              "-w", topic);
+            pulsarCluster.runAdminCommandOnAnyBroker("persistent",
+                "compaction-status", "-w", topic);
 
             try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) 
{
@@ -171,24 +142,19 @@ private static void waitAndVerifyCompacted(PulsarClient 
client, String topic,
         }
     }
 
-    @Test
-    public void testPublishWithAutoCompaction() throws Exception {
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                                          PulsarClusterUtils.PULSAR_ADMIN, 
"tenants",
-                                          "create", "compaction-test-auto",
-                                          "--allowed-clusters", clusterName,
-                                          "--admin-roles", "admin");
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
-                "create", "--clusters", "test", "compaction-test-auto/ns1");
-        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
-                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
-                "set-compaction-threshold", "--threshold", "1", 
"compaction-test-auto/ns1");
-
-        String brokerIp = DockerUtils.getContainerIP(
-                docker, PulsarClusterUtils.proxySet(docker, 
clusterName).stream().findAny().get());
-        String serviceUrl = "pulsar://" + brokerIp + ":6650";
-        String topic = "persistent://compaction-test-auto/ns1/topic1";
+    @Test(dataProvider = "ServiceUrls")
+    public void testPublishWithAutoCompaction(String serviceUrl) throws 
Exception {
+
+        final String tenant = "compaction-test-auto-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
+
+        this.createNamespace(namespace);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-compaction-threshold", "--threshold", "1", namespace);
 
         try (PulsarClient client = PulsarClient.create(serviceUrl)) {
             
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
@@ -207,4 +173,20 @@ public void testPublishWithAutoCompaction() throws 
Exception {
         }
     }
 
+    private Container.ExecResult createTenantName(final String tenantName,
+                                                  final String 
allowedClusterName,
+                                                  final String adminRoleName) 
throws Exception {
+        return pulsarCluster.runAdminCommandOnAnyBroker(
+            "tenants", "create", "--allowed-clusters", allowedClusterName,
+            "--admin-roles", adminRoleName, tenantName);
+    }
+
+    private Container.ExecResult createNamespace(final String Ns) throws 
Exception {
+        return pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces",
+                "create",
+                "--clusters",
+                pulsarCluster.getClusterName(), Ns);
+    }
+
 }
diff --git a/tests/integration/compaction/src/test/resources/arquillian.xml 
b/tests/integration/compaction/src/test/resources/arquillian.xml
deleted file mode 100644
index 821d6e6e09..0000000000
--- a/tests/integration/compaction/src/test/resources/arquillian.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<arquillian xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-            xmlns="http://jboss.org/schema/arquillian";
-            xsi:schemaLocation="http://jboss.org/schema/arquillian
-                                
http://jboss.org/schema/arquillian/arquillian_1_0.xsd";>
-
-  <extension qualifier="docker">
-    <property name="definitionFormat">CUBE</property>
-    <property 
name="dockerContainersResource">cube-definitions/single-cluster-3-bookie-2-broker.yaml</property>
-  </extension>
-
-</arquillian>
diff --git a/tests/integration/s3-offload/pom.xml 
b/tests/integration/s3-offload/pom.xml
index c32a9965ac..2dbe1bb6aa 100644
--- a/tests/integration/s3-offload/pom.xml
+++ b/tests/integration/s3-offload/pom.xml
@@ -38,11 +38,6 @@
   <name>Apache Pulsar :: Tests :: Integration Tests :: S3 Offload</name>
 
   <dependencies>
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-client-original</artifactId>
-      <version>${project.version}</version>
-    </dependency>
     <dependency>
       <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-client-admin-original</artifactId>
@@ -53,5 +48,11 @@
       <artifactId>managed-ledger-original</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+        <groupId>org.apache.pulsar.tests</groupId>
+        <artifactId>integration-tests-topologies</artifactId>
+        <version>${project.version}</version>
+        <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
 
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
index 42489767ea..0e8f48f3fb 100644
--- 
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
+++ 
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pulsar.tests.integration;
 
-import com.github.dockerjava.api.DockerClient;
-import com.google.common.collect.ImmutableMap;
-
+import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
+import com.google.common.collect.ImmutableMap;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 
@@ -33,58 +36,61 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 
-import org.apache.pulsar.tests.DockerUtils;
-import org.apache.pulsar.tests.PulsarClusterUtils;
-import org.jboss.arquillian.test.api.ArquillianResource;
-import org.jboss.arquillian.testng.Arquillian;
+import org.apache.pulsar.tests.containers.BrokerContainer;
+import org.apache.pulsar.tests.containers.S3Container;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import org.apache.pulsar.tests.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import static java.util.stream.Collectors.joining;
 
-public class TestS3Offload extends Arquillian {
-    private static final Logger log = 
LoggerFactory.getLogger(TestS3Offload.class);
+@Slf4j
+public class TestS3Offload extends PulsarClusterTestBase {
 
-    private static final String CLUSTER_NAME = "test";
     private static final int ENTRY_SIZE = 1024;
     private static final int ENTRIES_PER_LEDGER = 1024;
 
-    @ArquillianResource
-    DockerClient docker;
-
-    @BeforeMethod
-    public void configureAndStartBrokers() throws Exception {
-
-        String s3ip = DockerUtils.cubeIdsWithLabels(
-                docker, ImmutableMap.of("service", "s3", "cluster", 
CLUSTER_NAME))
-            .stream().map((c) -> DockerUtils.getContainerIP(docker, 
c)).findFirst().get();
-
-        String brokerConfFile = "/pulsar/conf/broker.conf";
-        for (String b : PulsarClusterUtils.brokerSet(docker, CLUSTER_NAME)) {
-            PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
-                    "managedLedgerMaxEntriesPerLedger", 
String.valueOf(ENTRIES_PER_LEDGER));
-            PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
-                    "managedLedgerMinLedgerRolloverTimeMinutes", "0");
-            PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
-                    "managedLedgerOffloadDriver", "s3");
-            PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
-                    "s3ManagedLedgerOffloadBucket", "pulsar-integtest");
-            PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
-                    "s3ManagedLedgerOffloadServiceEndpoint", "http://"; + s3ip 
+ ":9090");
+    @Override
+    @BeforeClass
+    public void setupCluster() throws Exception {
+
+        final String clusterName = Stream.of(this.getClass().getSimpleName(), 
randomName(5))
+                .filter(s -> s != null && !s.isEmpty())
+                .collect(joining("-"));
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+            .numBookies(2)
+            .numBrokers(1)
+            .externalServices(ImmutableMap.of(S3Container.NAME, new 
S3Container(clusterName, S3Container.NAME)))
+            .clusterName(clusterName)
+            .build();
+
+        log.info("Setting up cluster {} with {} bookies, {} brokers",
+                spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+
+        for(BrokerContainer brokerContainer : pulsarCluster.getBrokers()){
+            brokerContainer.withEnv("managedLedgerMaxEntriesPerLedger", 
String.valueOf(ENTRIES_PER_LEDGER));
+            
brokerContainer.withEnv("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+            brokerContainer.withEnv("managedLedgerOffloadDriver", "s3");
+            brokerContainer.withEnv("s3ManagedLedgerOffloadBucket", 
"pulsar-integtest");
+            brokerContainer.withEnv("s3ManagedLedgerOffloadServiceEndpoint", 
"http://"; + S3Container.NAME + ":9090");
         }
 
-        Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker, 
CLUSTER_NAME));
-        Assert.assertTrue(PulsarClusterUtils.startAllProxies(docker, 
CLUSTER_NAME));
-    }
+        pulsarCluster.start();
 
-    @AfterMethod
-    public void teardownBrokers() throws Exception {
-        PulsarClusterUtils.stopAllProxies(docker, CLUSTER_NAME);
-        Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker, 
CLUSTER_NAME));
+        log.info("Cluster {} is setup", spec.clusterName());
     }
 
     private static byte[] buildEntry(String pattern) {
@@ -97,165 +103,151 @@ public void teardownBrokers() throws Exception {
         return entry;
     }
 
-    @Test
-    public void testPublishOffloadAndConsumeViaCLI() throws Exception {
-        final String TENANT = "s3-offload-test-cli";
-        final String NAMESPACE = "s3-offload-test-cli/ns1";
-        final String TOPIC = "persistent://s3-offload-test-cli/ns1/topic1";
-
-        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
-                "/pulsar/bin/pulsar-admin", "tenants",
-                "create", "--allowed-clusters", CLUSTER_NAME,
-                "--admin-roles", "offload-admin", TENANT);
-        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
-                "/pulsar/bin/pulsar-admin", "namespaces",
-                "create", "--clusters", CLUSTER_NAME, NAMESPACE);
-
-        String broker = PulsarClusterUtils.brokerSet(docker, 
CLUSTER_NAME).stream().findFirst().get();
-        String proxyIp = PulsarClusterUtils.proxySet(docker, CLUSTER_NAME)
-            .stream().map((c) -> DockerUtils.getContainerIP(docker, 
c)).findFirst().get();
-        String serviceUrl = "pulsar://" + proxyIp + ":6650";
-        String adminUrl = "http://"; + proxyIp + ":8080";
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String 
adminUrl) throws Exception {
+        final String tenant = "s3-offload-test-cli-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        pulsarCluster.runAdminCommandOnAnyBroker( "tenants",
+            "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+            "--admin-roles", "offload-admin", tenant);
+
+        pulsarCluster.runAdminCommandOnAnyBroker(
+             "namespaces",
+            "create", "--clusters", pulsarCluster.getClusterName(), namespace);
 
         long firstLedger = -1;
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Producer producer = client.newProducer().topic(TOPIC)
+            Producer producer = client.newProducer().topic(topic)
                 .blockIfQueueFull(true).enableBatching(false).create();
             PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
-            
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe().close();
+            
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
 
             // write enough to topic to make it roll
             int i = 0;
-            for (; i < ENTRIES_PER_LEDGER*1.5; i++) {
-                producer.sendAsync(buildEntry("offload-message"+i));
+            for (; i < ENTRIES_PER_LEDGER * 1.5; i++) {
+                producer.sendAsync(buildEntry("offload-message" + i));
             }
-            MessageId latestMessage = 
producer.send(buildEntry("offload-message"+i));
+            MessageId latestMessage = 
producer.send(buildEntry("offload-message" + i));
 
             // read managed ledger info, check ledgers exist
-            firstLedger = 
admin.topics().getInternalStats(TOPIC).ledgers.get(0).ledgerId;
+            firstLedger = 
admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
 
             // first offload with a high threshold, nothing should offload
-            String output = DockerUtils.runCommand(docker, broker,
-                    "/pulsar/bin/pulsar-admin", "topics",
-                    "offload", "--size-threshold", "100G", TOPIC);
+
+            String output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                    "offload", "--size-threshold", "100G", topic).getStdout();
             Assert.assertTrue(output.contains("Nothing to offload"));
 
-            output = DockerUtils.runCommand(docker, broker,
-                    "/pulsar/bin/pulsar-admin", "topics", "offload-status", 
TOPIC);
+            output = pulsarCluster.runAdminCommandOnAnyBroker( "topics",
+                "offload-status", topic).getStdout();
             Assert.assertTrue(output.contains("Offload has not been run"));
 
             // offload with a low threshold
-            output = DockerUtils.runCommand(docker, broker,
-                    "/pulsar/bin/pulsar-admin", "topics",
-                    "offload", "--size-threshold", "1M", TOPIC);
+            output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                    "offload", "--size-threshold", "1M", topic).getStdout();
             Assert.assertTrue(output.contains("Offload triggered"));
 
-            output = DockerUtils.runCommand(docker, broker,
-                    "/pulsar/bin/pulsar-admin", "topics", "offload-status", 
"-w", TOPIC);
+            output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                "offload-status", "-w", topic).getStdout();
             Assert.assertTrue(output.contains("Offload was a success"));
         }
 
         // stop brokers to clear all caches, open handles, etc
-        Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker, 
CLUSTER_NAME));
+        pulsarCluster.stopAllBrokers();
 
         // delete the first ledger, so that we cannot possibly read from it
         ClientConfiguration bkConf = new ClientConfiguration();
-        bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker, 
CLUSTER_NAME));
+        bkConf.setZkServers(pulsarCluster.getZKConnString());
         try (BookKeeper bk = new BookKeeper(bkConf)) {
             bk.deleteLedger(firstLedger);
         }
 
         // start all brokers again
-        Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker, 
CLUSTER_NAME));
+        pulsarCluster.startAllBrokers();
 
         log.info("Read back the data (which would be in that first ledger)");
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Consumer consumer = 
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe()) {
+            Consumer consumer = 
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
             // read back from topic
-            for (int i = 0; i < ENTRIES_PER_LEDGER*1.5; i++) {
+            for (int i = 0; i < ENTRIES_PER_LEDGER * 1.5; i++) {
                 Message m = consumer.receive(1, TimeUnit.MINUTES);
-                Assert.assertEquals(buildEntry("offload-message"+i), 
m.getData());
+                Assert.assertEquals(buildEntry("offload-message" + i), 
m.getData());
             }
         }
     }
 
-    @Test
-    public void testPublishOffloadAndConsumeViaThreshold() throws Exception {
-        final String TENANT = "s3-offload-test-threshold";
-        final String NAMESPACE = "s3-offload-test-threshold/ns1";
-        final String TOPIC = 
"persistent://s3-offload-test-threshold/ns1/topic1";
-
-        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
-                "/pulsar/bin/pulsar-admin", "tenants",
-                "create", "--allowed-clusters", CLUSTER_NAME,
-                "--admin-roles", "offload-admin", TENANT);
-        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
-                "/pulsar/bin/pulsar-admin", "namespaces",
-                "create", "--clusters", CLUSTER_NAME, NAMESPACE);
-        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
-                "/pulsar/bin/pulsar-admin", "namespaces",
-                "set-offload-threshold", "--size", "1M", NAMESPACE);
-
-        String proxyIp  = PulsarClusterUtils.proxySet(docker, CLUSTER_NAME)
-            .stream().map((c) -> DockerUtils.getContainerIP(docker, 
c)).findFirst().get();
-        String serviceUrl = "pulsar://" + proxyIp + ":6650";
-        String adminUrl = "http://"; + proxyIp + ":8080";
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, 
String adminUrl) throws Exception {
+        final String tenant = "s3-offload-test-threshold-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        pulsarCluster.runAdminCommandOnAnyBroker("tenants",
+                "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+                "--admin-roles", "offload-admin", tenant);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "create", "--clusters", pulsarCluster.getClusterName(), 
namespace);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-offload-threshold", "--size", "1M", namespace);
 
         long firstLedger = 0;
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Producer producer = client.newProducer().topic(TOPIC)
+            Producer producer = client.newProducer().topic(topic)
                 .blockIfQueueFull(true).enableBatching(false).create();
             PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
 
-            
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe().close();
+            
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
 
             // write enough to topic to make it roll twice
-            for (int i = 0; i < ENTRIES_PER_LEDGER*2.5; i++) {
-                producer.sendAsync(buildEntry("offload-message"+i));
+            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+                producer.sendAsync(buildEntry("offload-message" + i));
             }
             producer.send(buildEntry("final-offload-message"));
 
-            firstLedger = 
admin.topics().getInternalStats(TOPIC).ledgers.get(0).ledgerId;
+            firstLedger = 
admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
 
             // wait up to 30 seconds for offload to occur
-            for (int i = 0; i < 300 && 
!admin.topics().getInternalStats(TOPIC).ledgers.get(0).offloaded; i++) {
+            for (int i = 0; i < 300 && 
!admin.topics().getInternalStats(topic).ledgers.get(0).offloaded; i++) {
                 Thread.sleep(100);
             }
-            
Assert.assertTrue(admin.topics().getInternalStats(TOPIC).ledgers.get(0).offloaded);
+            
Assert.assertTrue(admin.topics().getInternalStats(topic).ledgers.get(0).offloaded);
         }
 
         // stop brokers to clear all caches, open handles, etc
-        Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker, 
CLUSTER_NAME));
+        pulsarCluster.stopAllBrokers();
 
         // delete the first ledger, so that we cannot possibly read from it
         ClientConfiguration bkConf = new ClientConfiguration();
-        bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker, 
CLUSTER_NAME));
+        bkConf.setZkServers(pulsarCluster.getZKConnString());
         try (BookKeeper bk = new BookKeeper(bkConf)) {
             bk.deleteLedger(firstLedger);
         }
 
         // start all brokers again
-        Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker, 
CLUSTER_NAME));
+        pulsarCluster.startAllBrokers();
 
         log.info("Read back the data (which would be in that first ledger)");
         try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-             Consumer consumer = 
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe()) {
+             Consumer consumer = 
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
             // read back from topic
-            for (int i = 0; i < ENTRIES_PER_LEDGER*2.5; i++) {
+            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
                 Message m = consumer.receive(1, TimeUnit.MINUTES);
-                Assert.assertEquals(buildEntry("offload-message"+i), 
m.getData());
+                Assert.assertEquals(buildEntry("offload-message" + i), 
m.getData());
             }
         }
 
         // try disabling
-        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
-                "/pulsar/bin/pulsar-admin", "namespaces",
-                "set-offload-threshold", "--size", "-1", NAMESPACE);
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-offload-threshold", "--size", "-1", namespace);
 
         // hard to validate that it has been disabled as we'd be waiting for
         // something _not_ to happen (i.e. waiting for ages), so just check
         try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
-            
Assert.assertEquals(admin.namespaces().getOffloadThreshold(NAMESPACE), -1L);
+            
Assert.assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1L);
         }
     }
 }
diff --git a/tests/integration/s3-offload/src/test/resources/arquillian.xml 
b/tests/integration/s3-offload/src/test/resources/arquillian.xml
deleted file mode 100644
index 14714d2906..0000000000
--- a/tests/integration/s3-offload/src/test/resources/arquillian.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<arquillian xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-            xmlns="http://jboss.org/schema/arquillian";
-            xsi:schemaLocation="http://jboss.org/schema/arquillian
-                                
http://jboss.org/schema/arquillian/arquillian_1_0.xsd";>
-
-  <extension qualifier="docker">
-    <property name="definitionFormat">CUBE</property>
-    <property 
name="dockerContainersResource">cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml</property>
-  </extension>
-
-</arquillian>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to