This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b59248 Migrate compaction, s3offload to test containers (#2101)
7b59248 is described below
commit 7b59248ad87de2c9b96f56c542ff87a87a44e0f2
Author: Ali Ahmed <[email protected]>
AuthorDate: Tue Jul 17 02:42:25 2018 -0700
Migrate compaction, s3offload to test containers (#2101)
###Motivation
This is part of the migration effort from arquillian framework to
testcontainers.
---
.../pulsar/tests/containers/S3Container.java | 54 +++++
.../pulsar/tests/topologies/PulsarCluster.java | 122 +++++++----
.../pulsar/tests/topologies/PulsarClusterSpec.java | 5 +-
.../tests/topologies/PulsarClusterTestBase.java | 61 +++---
tests/integration/compaction/pom.xml | 17 +-
.../pulsar/tests/integration/TestCompaction.java | 140 ++++++-------
.../compaction/src/test/resources/arquillian.xml | 32 ---
tests/integration/s3-offload/pom.xml | 11 +-
.../pulsar/tests/integration/TestS3Offload.java | 228 ++++++++++-----------
.../s3-offload/src/test/resources/arquillian.xml | 32 ---
10 files changed, 364 insertions(+), 338 deletions(-)
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/S3Container.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/S3Container.java
new file mode 100644
index 0000000..9efee21
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/S3Container.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * S3 simulation container
+ */
+@Slf4j
+public class S3Container extends ChaosContainer<S3Container> {
+
+ public static final String NAME = "s3";
+ private static final String IMAGE_NAME = "apachepulsar/s3mock:latest";
+ private final String hostname;
+
+ public S3Container(String clusterName, String hostname) {
+ super(clusterName, IMAGE_NAME);
+ this.hostname = hostname;
+ this.withEnv("initialBuckets", "pulsar-integtest");
+ }
+
+ @Override
+ public String getContainerName() {
+ return clusterName + "-" + hostname;
+ }
+
+ @Override
+ public void start() {
+ this.withCreateContainerCmdModifier(createContainerCmd -> {
+ createContainerCmd.withHostName(hostname);
+ createContainerCmd.withName(getContainerName());
+ });
+
+ super.start();
+ log.info("Start s3 service");
+ }
+}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 040d59d..0f0bcff 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.tests.topologies;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT;
+import static org.apache.pulsar.tests.containers.PulsarContainer.ZK_PORT;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -32,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Stream;
+import com.google.common.collect.Streams;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.containers.BKContainer;
@@ -53,9 +55,10 @@ public class PulsarCluster {
public static final String ADMIN_SCRIPT = "/pulsar/bin/pulsar-admin";
public static final String CLIENT_SCRIPT = "/pulsar/bin/pulsar-client";
+ public static final String PULSAR_COMMAND_SCRIPT = "/pulsar/bin/pulsar";
/**
- * Pulsar Cluster Spec.
+ * Pulsar Cluster Spec
*
* @param spec pulsar cluster spec.
* @return the built pulsar cluster
@@ -106,6 +109,31 @@ public class PulsarCluster {
.withEnv("zookeeperServers", ZKContainer.NAME)
.withEnv("configurationStoreServers", CSContainer.NAME + ":" +
CS_PORT)
.withEnv("clusterName", clusterName);
+
+ // create bookies
+ bookieContainers.putAll(
+ runNumContainers("bookie", spec.numBookies(), (name) -> new
BKContainer(clusterName, name)
+ .withNetwork(network)
+ .withNetworkAliases(name)
+ .withEnv("zkServers", ZKContainer.NAME)
+ .withEnv("useHostNameAsBookieID", "true")
+ .withEnv("clusterName", clusterName)
+ )
+ );
+
+ // create brokers
+ brokerContainers.putAll(
+ runNumContainers("broker", spec.numBrokers(), (name) -> new
BrokerContainer(clusterName, name)
+ .withNetwork(network)
+ .withNetworkAliases(name)
+ .withEnv("zkServers", ZKContainer.NAME)
+ .withEnv("zookeeperServers", ZKContainer.NAME)
+ .withEnv("configurationStoreServers", CSContainer.NAME
+ ":" + CS_PORT)
+ .withEnv("clusterName", clusterName)
+
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
+ )
+ );
+
}
public String getPlainTextServiceUrl() {
@@ -116,6 +144,10 @@ public class PulsarCluster {
return proxyContainer.getHttpServiceUrl();
}
+ public String getZKConnString() {
+ return zkContainer.getContainerIpAddress() + ":" +
zkContainer.getMappedPort(ZK_PORT);
+ }
+
public void start() throws Exception {
// start the local zookeeper
zkContainer.start();
@@ -130,29 +162,13 @@ public class PulsarCluster {
"bin/init-cluster.sh");
log.info("Successfully initialized the cluster.");
- // create bookies
- bookieContainers.putAll(
- runNumContainers("bookie", spec.numBookies(), (name) -> new
BKContainer(clusterName, name)
- .withNetwork(network)
- .withNetworkAliases(name)
- .withEnv("zkServers", ZKContainer.NAME)
- .withEnv("useHostNameAsBookieID", "true")
- .withEnv("clusterName", clusterName)
- )
- );
+ // start bookies
+ bookieContainers.values().forEach(BKContainer::start);
+ log.info("Successfully started {} bookie conntainers.",
bookieContainers.size());
- // create brokers
- brokerContainers.putAll(
- runNumContainers("broker", spec.numBrokers(), (name) -> new
BrokerContainer(clusterName, name)
- .withNetwork(network)
- .withNetworkAliases(name)
- .withEnv("zkServers", ZKContainer.NAME)
- .withEnv("zookeeperServers", ZKContainer.NAME)
- .withEnv("configurationStoreServers", CSContainer.NAME + ":" +
CS_PORT)
- .withEnv("clusterName", clusterName)
- .withEnv("brokerServiceCompactionMonitorIntervalInSeconds",
"1")
- )
- );
+ // start brokers
+ this.startAllBrokers();
+ log.info("Successfully started {} broker conntainers.",
brokerContainers.size());
// create proxy
proxyContainer.start();
@@ -175,12 +191,12 @@ public class PulsarCluster {
}
// start external services
- Map<String, GenericContainer<?>> externalServices =
spec.externalServices;
+ final Map<String, GenericContainer<?>> externalServices =
spec.externalServices;
if (null != externalServices) {
- externalServices.entrySet().forEach(service -> {
- GenericContainer<?> serviceContainer = service.getValue();
- serviceContainer.withNetwork(network);
- serviceContainer.start();
+ externalServices.entrySet().parallelStream().forEach(service -> {
+ service.getValue().withNetwork(network);
+ service.getValue().withNetworkAliases(service.getKey());
+ service.getValue().start();
log.info("Successfully start external service {}.",
service.getKey());
});
}
@@ -195,24 +211,25 @@ public class PulsarCluster {
String name = "pulsar-" + serviceName + "-" + i;
T container = containerCreator.apply(name);
containers.put(name, container);
- startFutures.add(CompletableFuture.runAsync(() ->
container.start()));
}
- CompletableFuture.allOf(startFutures.toArray(new
CompletableFuture[startFutures.size()])).join();
- log.info("Successfully started {} {} containers", numContainers,
serviceName);
return containers;
}
public void stop() {
- Stream.of(proxyContainer, csContainer,
zkContainer).parallel().forEach(GenericContainer::stop);
-
workerContainers.values().parallelStream().forEach(GenericContainer::stop);
-
brokerContainers.values().parallelStream().forEach(GenericContainer::stop);
-
bookieContainers.values().parallelStream().forEach(GenericContainer::stop);
- if (null != spec.externalServices()) {
- spec.externalServices().values()
- .parallelStream()
- .forEach(GenericContainer::stop);
+
+ Stream<GenericContainer> containers = Streams.concat(
+ workerContainers.values().stream(),
+ brokerContainers.values().stream(),
+ bookieContainers.values().stream(),
+ Stream.of(proxyContainer, csContainer, zkContainer)
+ );
+
+ if (spec.externalServices() != null) {
+ containers = Streams.concat(containers,
spec.externalServices().values().stream());
}
+ containers.parallel().forEach(GenericContainer::stop);
+
try {
network.close();
} catch (Exception e) {
@@ -242,6 +259,7 @@ public class PulsarCluster {
// bookkeeper tools
.withEnv("zkServers", ZKContainer.NAME)
));
+ this.startWorkers();
}
private void startFunctionWorkersWithThreadContainerFactory(int
numFunctionWorkers) {
@@ -267,6 +285,13 @@ public class PulsarCluster {
// bookkeeper tools
.withEnv("zkServers", ZKContainer.NAME)
));
+ this.startWorkers();
+ }
+
+ private void startWorkers() {
+ // Start workers that have been initialized
+
workerContainers.values().parallelStream().forEach(WorkerContainer::start);
+ log.info("Successfully started {} worker conntainers.",
workerContainers.size());
}
public BrokerContainer getAnyBroker() {
@@ -290,13 +315,29 @@ public class PulsarCluster {
}
public ExecResult runAdminCommandOnAnyBroker(String...commands) throws
Exception {
+ return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
+ }
+
+ public ExecResult runPulsarBaseCommandOnAnyBroker(String...commands)
throws Exception {
+ return runCommandOnAnyBrokerWithScript(PULSAR_COMMAND_SCRIPT,
commands);
+ }
+
+ private ExecResult runCommandOnAnyBrokerWithScript(String scriptType,
String...commands) throws Exception {
BrokerContainer container = getAnyBroker();
String[] cmds = new String[commands.length + 1];
- cmds[0] = ADMIN_SCRIPT;
+ cmds[0] = scriptType;
System.arraycopy(commands, 0, cmds, 1, commands.length);
return container.execCmd(cmds);
}
+ public void stopAllBrokers() {
+ brokerContainers.values().forEach(BrokerContainer::stop);
+ }
+
+ public void startAllBrokers() {
+ brokerContainers.values().forEach(BrokerContainer::start);
+ }
+
public ExecResult createNamespace(String nsName) throws Exception {
return runAdminCommandOnAnyBroker(
"namespaces", "create", "public/" + nsName,
@@ -308,4 +349,5 @@ public class PulsarCluster {
"namespaces", "set-deduplication", "public/" + nsName,
enabled ? "--enable" : "--disable");
}
+
}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
index fd830e5..ceb30b5 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.tests.topologies;
+import java.util.Collections;
import java.util.Map;
import lombok.Builder;
import lombok.Builder.Default;
@@ -26,7 +27,6 @@ import lombok.Setter;
import lombok.experimental.Accessors;
import org.apache.pulsar.tests.containers.ChaosContainer;
import org.testcontainers.containers.GenericContainer;
-import org.testng.collections.Maps;
/**
* Spec to build a pulsar cluster.
@@ -90,13 +90,14 @@ public class PulsarClusterSpec {
*
* @return the list of external services to start with the cluster.
*/
- Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+ Map<String, GenericContainer<?>> externalServices = Collections.EMPTY_MAP;
/**
* Returns the flag whether to enable/disable container log.
*
* @return the flag whether to enable/disable container log.
*/
+ @Default
boolean enableContainerLog = false;
}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
index b88dee7..addb04a 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -34,26 +34,37 @@ public abstract class PulsarClusterTestBase {
@DataProvider(name = "ServiceUrlAndTopics")
public static Object[][] serviceUrlAndTopics() {
return new Object[][] {
- // plain text, persistent topic
- {
- pulsarCluster.getPlainTextServiceUrl(),
- true,
- },
- // plain text, non-persistent topic
- {
- pulsarCluster.getPlainTextServiceUrl(),
- false
- }
+ // plain text, persistent topic
+ {
+ pulsarCluster.getPlainTextServiceUrl(),
+ true,
+ },
+ // plain text, non-persistent topic
+ {
+ pulsarCluster.getPlainTextServiceUrl(),
+ false
+ }
};
}
@DataProvider(name = "ServiceUrls")
public static Object[][] serviceUrls() {
return new Object[][] {
- // plain text
- {
- pulsarCluster.getPlainTextServiceUrl()
- }
+ // plain text
+ {
+ pulsarCluster.getPlainTextServiceUrl()
+ }
+ };
+ }
+
+ @DataProvider(name = "ServiceAndAdminUrls")
+ public static Object[][] serviceAndAdminUrls() {
+ return new Object[][] {
+ // plain text
+ {
+ pulsarCluster.getPlainTextServiceUrl(),
+ pulsarCluster.getHttpServiceUrl()
+ }
};
}
@@ -66,24 +77,24 @@ public abstract class PulsarClusterTestBase {
public void setupCluster(String namePrefix) throws Exception {
String clusterName = Stream.of(this.getClass().getSimpleName(),
namePrefix, randomName(5))
- .filter(s -> s != null && !s.isEmpty())
- .collect(joining("-"));
+ .filter(s -> s != null && !s.isEmpty())
+ .collect(joining("-"));
PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder =
PulsarClusterSpec.builder()
- .clusterName(clusterName);
+ .clusterName(clusterName);
setupCluster(beforeSetupCluster(clusterName, specBuilder).build());
}
protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
- String clusterName,
- PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+ String clusterName,
+ PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
return specBuilder;
}
protected void setupCluster(PulsarClusterSpec spec) throws Exception {
log.info("Setting up cluster {} with {} bookies, {} brokers",
- spec.clusterName(), spec.numBookies(), spec.numBrokers());
+ spec.clusterName(), spec.numBookies(), spec.numBrokers());
pulsarCluster = PulsarCluster.forSpec(spec);
pulsarCluster.start();
@@ -116,11 +127,11 @@ public abstract class PulsarClusterTestBase {
protected static String generateTopicName(String namespace, String
topicPrefix, boolean isPersistent) {
String topicName = new StringBuilder(topicPrefix)
- .append("-")
- .append(randomName(8))
- .append("-")
- .append(System.currentTimeMillis())
- .toString();
+ .append("-")
+ .append(randomName(8))
+ .append("-")
+ .append(System.currentTimeMillis())
+ .toString();
if (isPersistent) {
return "persistent://public/" + namespace + "/" + topicName;
} else {
diff --git a/tests/integration/compaction/pom.xml
b/tests/integration/compaction/pom.xml
index c80836f..6ef6f57 100644
--- a/tests/integration/compaction/pom.xml
+++ b/tests/integration/compaction/pom.xml
@@ -38,10 +38,17 @@
<name>Apache Pulsar :: Tests :: Integration Tests :: Topic Compaction</name>
<dependencies>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client</artifactId>
- <version>${project.version}</version>
- </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar.tests</groupId>
+ <artifactId>integration-tests-topologies</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
index a716f9d..9bee227 100644
---
a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
+++
b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
@@ -18,53 +18,31 @@
*/
package org.apache.pulsar.tests.integration;
-import com.github.dockerjava.api.DockerClient;
-
-import java.util.concurrent.TimeUnit;
-
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
-import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.tests.DockerUtils;
-import org.apache.pulsar.tests.PulsarClusterUtils;
-import org.jboss.arquillian.test.api.ArquillianResource;
-import org.jboss.arquillian.testng.Arquillian;
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Container;
import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class TestCompaction extends Arquillian {
- private static String clusterName = "test";
+import static java.util.stream.Collectors.joining;
- @ArquillianResource
- DockerClient docker;
+public class TestCompaction extends PulsarClusterTestBase {
- @BeforeMethod
- public void waitServicesUp() throws Exception {
- Assert.assertTrue(PulsarClusterUtils.waitZooKeeperUp(docker,
clusterName, 30, TimeUnit.SECONDS));
- Assert.assertTrue(PulsarClusterUtils.waitAllBrokersUp(docker,
clusterName));
- }
+ @Test(dataProvider = "ServiceUrls")
+ public void testPublishCompactAndConsumeCLI(String serviceUrl) throws
Exception {
+
+ final String tenant = "compaction-test-cli-" + randomName(4);
+ final String namespace = tenant + "/ns1";
+ final String topic = "persistent://" + namespace + "/topic1";
+
+ this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
- @Test
- public void testPublishCompactAndConsumeCLI() throws Exception {
- PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- PulsarClusterUtils.PULSAR_ADMIN,
"tenants",
- "create", "compaction-test-cli",
"--allowed-clusters", clusterName,
- "--admin-roles", "admin");
- PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
- "create", "compaction-test-cli/ns1");
- PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
- "set-clusters", "--clusters", "test",
"compaction-test-cli/ns1");
-
- String brokerIp = DockerUtils.getContainerIP(
- docker, PulsarClusterUtils.proxySet(docker,
clusterName).stream().findAny().get());
- String serviceUrl = "pulsar://" + brokerIp + ":6650";
- String topic = "persistent://compaction-test-cli/ns1/topic1";
+ this.createNamespace(namespace);
try (PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build()) {
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
@@ -85,9 +63,7 @@ public class TestCompaction extends Arquillian {
Assert.assertEquals(m.getData(), "content1".getBytes());
}
- PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- PulsarClusterUtils.PULSAR,
"compact-topic",
- "-t", topic);
+ pulsarCluster.runPulsarBaseCommandOnAnyBroker("compact-topic",
"-t", topic);
try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.readCompacted(true).subscriptionName("sub1").subscribe())
{
@@ -98,23 +74,19 @@ public class TestCompaction extends Arquillian {
}
}
- @Test
- public void testPublishCompactAndConsumeRest() throws Exception {
- PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- PulsarClusterUtils.PULSAR_ADMIN,
"tenants",
- "create", "compaction-test-rest",
"--allowed-clusters", clusterName,
- "--admin-roles", "admin");
- PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
- "create", "compaction-test-rest/ns1");
- PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
- "set-clusters", "--clusters", "test",
"compaction-test-rest/ns1");
-
- String brokerIp = DockerUtils.getContainerIP(
- docker, PulsarClusterUtils.proxySet(docker,
clusterName).stream().findAny().get());
- String serviceUrl = "pulsar://" + brokerIp + ":6650";
- String topic = "persistent://compaction-test-rest/ns1/topic1";
+ @Test(dataProvider = "ServiceUrls")
+ public void testPublishCompactAndConsumeRest(String serviceUrl) throws
Exception {
+
+ final String tenant = "compaction-test-rest-" + randomName(4);
+ final String namespace = tenant + "/ns1";
+ final String topic = "persistent://" + namespace + "/topic1";
+
+ this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
+
+ this.createNamespace(namespace);
+
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+ "set-clusters", "--clusters", pulsarCluster.getClusterName(),
namespace);
try (PulsarClient client = PulsarClient.create(serviceUrl)) {
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
@@ -134,12 +106,11 @@ public class TestCompaction extends Arquillian {
Assert.assertEquals(m.getKey(), "key0");
Assert.assertEquals(m.getData(), "content1".getBytes());
}
- PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- PulsarClusterUtils.PULSAR_ADMIN, "persistent", "compact",
topic);
+ pulsarCluster.runAdminCommandOnAnyBroker("persistent",
+ "compact", topic);
- PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- "/pulsar/bin/pulsar-admin",
"persistent", "compaction-status",
- "-w", topic);
+ pulsarCluster.runAdminCommandOnAnyBroker("persistent",
+ "compaction-status", "-w", topic);
try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
.readCompacted(true).subscriptionName("sub1").subscribe())
{
@@ -171,24 +142,19 @@ public class TestCompaction extends Arquillian {
}
}
- @Test
- public void testPublishWithAutoCompaction() throws Exception {
- PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- PulsarClusterUtils.PULSAR_ADMIN,
"tenants",
- "create", "compaction-test-auto",
- "--allowed-clusters", clusterName,
- "--admin-roles", "admin");
- PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
- "create", "--clusters", "test", "compaction-test-auto/ns1");
- PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
- "set-compaction-threshold", "--threshold", "1",
"compaction-test-auto/ns1");
-
- String brokerIp = DockerUtils.getContainerIP(
- docker, PulsarClusterUtils.proxySet(docker,
clusterName).stream().findAny().get());
- String serviceUrl = "pulsar://" + brokerIp + ":6650";
- String topic = "persistent://compaction-test-auto/ns1/topic1";
+ @Test(dataProvider = "ServiceUrls")
+ public void testPublishWithAutoCompaction(String serviceUrl) throws
Exception {
+
+ final String tenant = "compaction-test-auto-" + randomName(4);
+ final String namespace = tenant + "/ns1";
+ final String topic = "persistent://" + namespace + "/topic1";
+
+ this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
+
+ this.createNamespace(namespace);
+
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+ "set-compaction-threshold", "--threshold", "1", namespace);
try (PulsarClient client = PulsarClient.create(serviceUrl)) {
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
@@ -207,4 +173,20 @@ public class TestCompaction extends Arquillian {
}
}
+ private Container.ExecResult createTenantName(final String tenantName,
+ final String
allowedClusterName,
+ final String adminRoleName)
throws Exception {
+ return pulsarCluster.runAdminCommandOnAnyBroker(
+ "tenants", "create", "--allowed-clusters", allowedClusterName,
+ "--admin-roles", adminRoleName, tenantName);
+ }
+
+ private Container.ExecResult createNamespace(final String Ns) throws
Exception {
+ return pulsarCluster.runAdminCommandOnAnyBroker(
+ "namespaces",
+ "create",
+ "--clusters",
+ pulsarCluster.getClusterName(), Ns);
+ }
+
}
diff --git a/tests/integration/compaction/src/test/resources/arquillian.xml
b/tests/integration/compaction/src/test/resources/arquillian.xml
deleted file mode 100644
index 821d6e6..0000000
--- a/tests/integration/compaction/src/test/resources/arquillian.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<arquillian xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://jboss.org/schema/arquillian"
- xsi:schemaLocation="http://jboss.org/schema/arquillian
-
http://jboss.org/schema/arquillian/arquillian_1_0.xsd">
-
- <extension qualifier="docker">
- <property name="definitionFormat">CUBE</property>
- <property
name="dockerContainersResource">cube-definitions/single-cluster-3-bookie-2-broker.yaml</property>
- </extension>
-
-</arquillian>
diff --git a/tests/integration/s3-offload/pom.xml
b/tests/integration/s3-offload/pom.xml
index c32a996..2dbe1bb 100644
--- a/tests/integration/s3-offload/pom.xml
+++ b/tests/integration/s3-offload/pom.xml
@@ -40,11 +40,6 @@
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-original</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin-original</artifactId>
<version>${project.version}</version>
</dependency>
@@ -53,5 +48,11 @@
<artifactId>managed-ledger-original</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar.tests</groupId>
+ <artifactId>integration-tests-topologies</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
index 4248976..0e8f48f 100644
---
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
+++
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
@@ -18,11 +18,14 @@
*/
package org.apache.pulsar.tests.integration;
-import com.github.dockerjava.api.DockerClient;
-import com.google.common.collect.ImmutableMap;
-
+import java.util.Collection;
+import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import com.google.common.collect.ImmutableMap;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -33,58 +36,61 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.tests.DockerUtils;
-import org.apache.pulsar.tests.PulsarClusterUtils;
-import org.jboss.arquillian.test.api.ArquillianResource;
-import org.jboss.arquillian.testng.Arquillian;
+import org.apache.pulsar.tests.containers.BrokerContainer;
+import org.apache.pulsar.tests.containers.S3Container;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import org.apache.pulsar.tests.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import static java.util.stream.Collectors.joining;
-public class TestS3Offload extends Arquillian {
- private static final Logger log =
LoggerFactory.getLogger(TestS3Offload.class);
+@Slf4j
+public class TestS3Offload extends PulsarClusterTestBase {
- private static final String CLUSTER_NAME = "test";
private static final int ENTRY_SIZE = 1024;
private static final int ENTRIES_PER_LEDGER = 1024;
- @ArquillianResource
- DockerClient docker;
-
- @BeforeMethod
- public void configureAndStartBrokers() throws Exception {
-
- String s3ip = DockerUtils.cubeIdsWithLabels(
- docker, ImmutableMap.of("service", "s3", "cluster",
CLUSTER_NAME))
- .stream().map((c) -> DockerUtils.getContainerIP(docker,
c)).findFirst().get();
-
- String brokerConfFile = "/pulsar/conf/broker.conf";
- for (String b : PulsarClusterUtils.brokerSet(docker, CLUSTER_NAME)) {
- PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
- "managedLedgerMaxEntriesPerLedger",
String.valueOf(ENTRIES_PER_LEDGER));
- PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
- "managedLedgerMinLedgerRolloverTimeMinutes", "0");
- PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
- "managedLedgerOffloadDriver", "s3");
- PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
- "s3ManagedLedgerOffloadBucket", "pulsar-integtest");
- PulsarClusterUtils.updateConf(docker, b, brokerConfFile,
- "s3ManagedLedgerOffloadServiceEndpoint", "http://" + s3ip
+ ":9090");
+ @Override
+ @BeforeClass
+ public void setupCluster() throws Exception {
+
+ final String clusterName = Stream.of(this.getClass().getSimpleName(),
randomName(5))
+ .filter(s -> s != null && !s.isEmpty())
+ .collect(joining("-"));
+
+ PulsarClusterSpec spec = PulsarClusterSpec.builder()
+ .numBookies(2)
+ .numBrokers(1)
+ .externalServices(ImmutableMap.of(S3Container.NAME, new
S3Container(clusterName, S3Container.NAME)))
+ .clusterName(clusterName)
+ .build();
+
+ log.info("Setting up cluster {} with {} bookies, {} brokers",
+ spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+ pulsarCluster = PulsarCluster.forSpec(spec);
+
+ for(BrokerContainer brokerContainer : pulsarCluster.getBrokers()){
+ brokerContainer.withEnv("managedLedgerMaxEntriesPerLedger",
String.valueOf(ENTRIES_PER_LEDGER));
+
brokerContainer.withEnv("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+ brokerContainer.withEnv("managedLedgerOffloadDriver", "s3");
+ brokerContainer.withEnv("s3ManagedLedgerOffloadBucket",
"pulsar-integtest");
+ brokerContainer.withEnv("s3ManagedLedgerOffloadServiceEndpoint",
"http://" + S3Container.NAME + ":9090");
}
- Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker,
CLUSTER_NAME));
- Assert.assertTrue(PulsarClusterUtils.startAllProxies(docker,
CLUSTER_NAME));
- }
+ pulsarCluster.start();
- @AfterMethod
- public void teardownBrokers() throws Exception {
- PulsarClusterUtils.stopAllProxies(docker, CLUSTER_NAME);
- Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker,
CLUSTER_NAME));
+ log.info("Cluster {} is setup", spec.clusterName());
}
private static byte[] buildEntry(String pattern) {
@@ -97,165 +103,151 @@ public class TestS3Offload extends Arquillian {
return entry;
}
- @Test
- public void testPublishOffloadAndConsumeViaCLI() throws Exception {
- final String TENANT = "s3-offload-test-cli";
- final String NAMESPACE = "s3-offload-test-cli/ns1";
- final String TOPIC = "persistent://s3-offload-test-cli/ns1/topic1";
-
- PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
- "/pulsar/bin/pulsar-admin", "tenants",
- "create", "--allowed-clusters", CLUSTER_NAME,
- "--admin-roles", "offload-admin", TENANT);
- PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
- "/pulsar/bin/pulsar-admin", "namespaces",
- "create", "--clusters", CLUSTER_NAME, NAMESPACE);
-
- String broker = PulsarClusterUtils.brokerSet(docker,
CLUSTER_NAME).stream().findFirst().get();
- String proxyIp = PulsarClusterUtils.proxySet(docker, CLUSTER_NAME)
- .stream().map((c) -> DockerUtils.getContainerIP(docker,
c)).findFirst().get();
- String serviceUrl = "pulsar://" + proxyIp + ":6650";
- String adminUrl = "http://" + proxyIp + ":8080";
+ @Test(dataProvider = "ServiceAndAdminUrls")
+ public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String
adminUrl) throws Exception {
+ final String tenant = "s3-offload-test-cli-" + randomName(4);
+ final String namespace = tenant + "/ns1";
+ final String topic = "persistent://" + namespace + "/topic1";
+
+ pulsarCluster.runAdminCommandOnAnyBroker( "tenants",
+ "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+ "--admin-roles", "offload-admin", tenant);
+
+ pulsarCluster.runAdminCommandOnAnyBroker(
+ "namespaces",
+ "create", "--clusters", pulsarCluster.getClusterName(), namespace);
long firstLedger = -1;
try(PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
- Producer producer = client.newProducer().topic(TOPIC)
+ Producer producer = client.newProducer().topic(topic)
.blockIfQueueFull(true).enableBatching(false).create();
PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
-
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe().close();
+
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
// write enough to topic to make it roll
int i = 0;
- for (; i < ENTRIES_PER_LEDGER*1.5; i++) {
- producer.sendAsync(buildEntry("offload-message"+i));
+ for (; i < ENTRIES_PER_LEDGER * 1.5; i++) {
+ producer.sendAsync(buildEntry("offload-message" + i));
}
- MessageId latestMessage =
producer.send(buildEntry("offload-message"+i));
+ MessageId latestMessage =
producer.send(buildEntry("offload-message" + i));
// read managed ledger info, check ledgers exist
- firstLedger =
admin.topics().getInternalStats(TOPIC).ledgers.get(0).ledgerId;
+ firstLedger =
admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
// first offload with a high threshold, nothing should offload
- String output = DockerUtils.runCommand(docker, broker,
- "/pulsar/bin/pulsar-admin", "topics",
- "offload", "--size-threshold", "100G", TOPIC);
+
+ String output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+ "offload", "--size-threshold", "100G", topic).getStdout();
Assert.assertTrue(output.contains("Nothing to offload"));
- output = DockerUtils.runCommand(docker, broker,
- "/pulsar/bin/pulsar-admin", "topics", "offload-status",
TOPIC);
+ output = pulsarCluster.runAdminCommandOnAnyBroker( "topics",
+ "offload-status", topic).getStdout();
Assert.assertTrue(output.contains("Offload has not been run"));
// offload with a low threshold
- output = DockerUtils.runCommand(docker, broker,
- "/pulsar/bin/pulsar-admin", "topics",
- "offload", "--size-threshold", "1M", TOPIC);
+ output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+ "offload", "--size-threshold", "1M", topic).getStdout();
Assert.assertTrue(output.contains("Offload triggered"));
- output = DockerUtils.runCommand(docker, broker,
- "/pulsar/bin/pulsar-admin", "topics", "offload-status",
"-w", TOPIC);
+ output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+ "offload-status", "-w", topic).getStdout();
Assert.assertTrue(output.contains("Offload was a success"));
}
// stop brokers to clear all caches, open handles, etc
- Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker,
CLUSTER_NAME));
+ pulsarCluster.stopAllBrokers();
// delete the first ledger, so that we cannot possibly read from it
ClientConfiguration bkConf = new ClientConfiguration();
- bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker,
CLUSTER_NAME));
+ bkConf.setZkServers(pulsarCluster.getZKConnString());
try (BookKeeper bk = new BookKeeper(bkConf)) {
bk.deleteLedger(firstLedger);
}
// start all brokers again
- Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker,
CLUSTER_NAME));
+ pulsarCluster.startAllBrokers();
log.info("Read back the data (which would be in that first ledger)");
try(PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
- Consumer consumer =
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe()) {
+ Consumer consumer =
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
// read back from topic
- for (int i = 0; i < ENTRIES_PER_LEDGER*1.5; i++) {
+ for (int i = 0; i < ENTRIES_PER_LEDGER * 1.5; i++) {
Message m = consumer.receive(1, TimeUnit.MINUTES);
- Assert.assertEquals(buildEntry("offload-message"+i),
m.getData());
+ Assert.assertEquals(buildEntry("offload-message" + i),
m.getData());
}
}
}
- @Test
- public void testPublishOffloadAndConsumeViaThreshold() throws Exception {
- final String TENANT = "s3-offload-test-threshold";
- final String NAMESPACE = "s3-offload-test-threshold/ns1";
- final String TOPIC =
"persistent://s3-offload-test-threshold/ns1/topic1";
-
- PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
- "/pulsar/bin/pulsar-admin", "tenants",
- "create", "--allowed-clusters", CLUSTER_NAME,
- "--admin-roles", "offload-admin", TENANT);
- PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
- "/pulsar/bin/pulsar-admin", "namespaces",
- "create", "--clusters", CLUSTER_NAME, NAMESPACE);
- PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
- "/pulsar/bin/pulsar-admin", "namespaces",
- "set-offload-threshold", "--size", "1M", NAMESPACE);
-
- String proxyIp = PulsarClusterUtils.proxySet(docker, CLUSTER_NAME)
- .stream().map((c) -> DockerUtils.getContainerIP(docker,
c)).findFirst().get();
- String serviceUrl = "pulsar://" + proxyIp + ":6650";
- String adminUrl = "http://" + proxyIp + ":8080";
+ @Test(dataProvider = "ServiceAndAdminUrls")
+ public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl,
String adminUrl) throws Exception {
+ final String tenant = "s3-offload-test-threshold-" + randomName(4);
+ final String namespace = tenant + "/ns1";
+ final String topic = "persistent://" + namespace + "/topic1";
+
+ pulsarCluster.runAdminCommandOnAnyBroker("tenants",
+ "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+ "--admin-roles", "offload-admin", tenant);
+
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+ "create", "--clusters", pulsarCluster.getClusterName(),
namespace);
+
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+ "set-offload-threshold", "--size", "1M", namespace);
long firstLedger = 0;
try(PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
- Producer producer = client.newProducer().topic(TOPIC)
+ Producer producer = client.newProducer().topic(topic)
.blockIfQueueFull(true).enableBatching(false).create();
PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
-
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe().close();
+
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
// write enough to topic to make it roll twice
- for (int i = 0; i < ENTRIES_PER_LEDGER*2.5; i++) {
- producer.sendAsync(buildEntry("offload-message"+i));
+ for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+ producer.sendAsync(buildEntry("offload-message" + i));
}
producer.send(buildEntry("final-offload-message"));
- firstLedger =
admin.topics().getInternalStats(TOPIC).ledgers.get(0).ledgerId;
+ firstLedger =
admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
// wait up to 30 seconds for offload to occur
- for (int i = 0; i < 300 &&
!admin.topics().getInternalStats(TOPIC).ledgers.get(0).offloaded; i++) {
+ for (int i = 0; i < 300 &&
!admin.topics().getInternalStats(topic).ledgers.get(0).offloaded; i++) {
Thread.sleep(100);
}
-
Assert.assertTrue(admin.topics().getInternalStats(TOPIC).ledgers.get(0).offloaded);
+
Assert.assertTrue(admin.topics().getInternalStats(topic).ledgers.get(0).offloaded);
}
// stop brokers to clear all caches, open handles, etc
- Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker,
CLUSTER_NAME));
+ pulsarCluster.stopAllBrokers();
// delete the first ledger, so that we cannot possibly read from it
ClientConfiguration bkConf = new ClientConfiguration();
- bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker,
CLUSTER_NAME));
+ bkConf.setZkServers(pulsarCluster.getZKConnString());
try (BookKeeper bk = new BookKeeper(bkConf)) {
bk.deleteLedger(firstLedger);
}
// start all brokers again
- Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker,
CLUSTER_NAME));
+ pulsarCluster.startAllBrokers();
log.info("Read back the data (which would be in that first ledger)");
try (PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
- Consumer consumer =
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe()) {
+ Consumer consumer =
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
// read back from topic
- for (int i = 0; i < ENTRIES_PER_LEDGER*2.5; i++) {
+ for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
Message m = consumer.receive(1, TimeUnit.MINUTES);
- Assert.assertEquals(buildEntry("offload-message"+i),
m.getData());
+ Assert.assertEquals(buildEntry("offload-message" + i),
m.getData());
}
}
// try disabling
- PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
- "/pulsar/bin/pulsar-admin", "namespaces",
- "set-offload-threshold", "--size", "-1", NAMESPACE);
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+ "set-offload-threshold", "--size", "-1", namespace);
// hard to validate that it has been disabled as we'd be waiting for
// something _not_ to happen (i.e. waiting for ages), so just check
try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
-
Assert.assertEquals(admin.namespaces().getOffloadThreshold(NAMESPACE), -1L);
+
Assert.assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1L);
}
}
}
diff --git a/tests/integration/s3-offload/src/test/resources/arquillian.xml
b/tests/integration/s3-offload/src/test/resources/arquillian.xml
deleted file mode 100644
index 14714d2..0000000
--- a/tests/integration/s3-offload/src/test/resources/arquillian.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<arquillian xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://jboss.org/schema/arquillian"
- xsi:schemaLocation="http://jboss.org/schema/arquillian
-
http://jboss.org/schema/arquillian/arquillian_1_0.xsd">
-
- <extension qualifier="docker">
- <property name="definitionFormat">CUBE</property>
- <property
name="dockerContainersResource">cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml</property>
- </extension>
-
-</arquillian>