This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4d1cf20 Introduce testcontainer based cluster integration tests
(#2055)
4d1cf20 is described below
commit 4d1cf20c6d52ad8491eff0fdd9f1ea4793134999
Author: Sijie Guo <[email protected]>
AuthorDate: Sat Jun 30 17:22:01 2018 -0700
Introduce testcontainer based cluster integration tests (#2055)
* Introduce testcontainer cluster tests
*Motivation*
Introduce testcontainer based cluster tests for replacing existing
arquilian tests.
testcontainer is more flexible and more nature to how we write unit tests.
arquilian is configuration based and hard to change cluster at runtime.
it is possible to add bookies/brokers/proxies at testing runtime using
testcontainers.
current arquilian based integration tests can only run in linux environment
(which is using host network). it is impossible to run arquillian tests in mac
environment,
where host network is meaningless. changing to testcontainers is allowing
running integration tests on mac to speed up development.
*Changes*
This mirrors the exact same setup as existing arquillian tests - 1 zk
container, 1 cs container, 3 bk containers, 2 broker containers and 1 proxy
container.
add a semantic integration test module to using the new testcontainer test
base `PulsarClusterTestBase`.
*Result*
- both arquillian and testcontainers exist.
- new integration tests are encouraged to be written using
`PulsarClusterTestBase`. It is just as easy as how you write normal unit tests.
- existing arquillian tests will be migrated in future PRs.
* all modules under `tests/integration` should only be run after
`-DintegrationTests` is specified
---
pom.xml | 27 ++++
tests/integration-tests-topologies/pom.xml | 16 ++
.../pulsar/tests/containers/BKContainer.java | 30 ++++
.../pulsar/tests/containers/BrokerContainer.java | 30 ++++
.../pulsar/tests/containers/CSContainer.java | 37 +++++
.../pulsar/tests/containers/ChaosContainer.java | 116 ++++++++++++++
.../pulsar/tests/containers/ProxyContainer.java | 38 +++++
.../pulsar/tests/containers/PulsarContainer.java | 111 +++++++++++++
.../pulsar/tests/containers/ZKContainer.java | 38 +++++
.../pulsar/tests/containers/package-info.java | 22 +++
.../pulsar/tests/topologies/PulsarCluster.java | 172 +++++++++++++++++++++
.../pulsar/tests/topologies/PulsarClusterSpec.java | 74 +++++++++
.../tests/topologies/PulsarClusterTestBase.java | 61 ++++++++
tests/integration-tests-utils/pom.xml | 7 -
tests/integration/pom.xml | 50 ++++++
.../semantics}/pom.xml | 23 ++-
.../tests/integration/semantics/SemanticsTest.java | 73 +++++++++
17 files changed, 913 insertions(+), 12 deletions(-)
diff --git a/pom.xml b/pom.xml
index 37ed8f9..5acf556 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,6 +122,9 @@ flexible messaging model and an intuitive client
API.</description>
<testRealAWS>false</testRealAWS>
<testRetryCount>1</testRetryCount>
+ <!-- apache commons -->
+ <commons-compress.version>1.15</commons-compress.version>
+
<bookkeeper.version>4.7.1</bookkeeper.version>
<zookeeper.version>3.5.4-beta</zookeeper.version>
<netty.version>4.1.22.Final</netty.version>
@@ -155,7 +158,10 @@ flexible messaging model and an intuitive client
API.</description>
<avro.version>1.8.2</avro.version>
<!-- test dependencies -->
+ <arquillian-cube.version>1.15.1</arquillian-cube.version>
+ <arquillian-junit.version>1.1.14.Final</arquillian-junit.version>
<disruptor.version>3.4.0</disruptor.version>
+ <testcontainers.version>1.8.0</testcontainers.version>
<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.5.0</protobuf-maven-plugin.version>
@@ -391,6 +397,12 @@ flexible messaging model and an intuitive client
API.</description>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>${commons-compress.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.6</version>
@@ -783,6 +795,21 @@ flexible messaging model and an intuitive client
API.</description>
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.arquillian.cube</groupId>
+ <artifactId>arquillian-cube-docker</artifactId>
+ <version>${arquillian-cube.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.arquillian.junit</groupId>
+ <artifactId>arquillian-junit-standalone</artifactId>
+ <version>${arquillian-junit.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/tests/integration-tests-topologies/pom.xml
b/tests/integration-tests-topologies/pom.xml
index 93fc1fe..4b84adf 100644
--- a/tests/integration-tests-topologies/pom.xml
+++ b/tests/integration-tests-topologies/pom.xml
@@ -36,5 +36,21 @@
<packaging>jar</packaging>
<name>Apache Pulsar :: Tests :: Common topologies for Arquillian based
integration tests</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar.tests</groupId>
+ <artifactId>integration-tests-utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
</project>
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/BKContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/BKContainer.java
new file mode 100644
index 0000000..1b87ad9
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/BKContainer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs bookkeeper.
+ */
+public class BKContainer extends PulsarContainer<BKContainer> {
+
+ public BKContainer(String clusterName, String hostName) {
+ super(
+ clusterName, hostName, hostName, "bin/run-bookie.sh", BOOKIE_PORT,
INVALID_PORT);
+ }
+}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/BrokerContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/BrokerContainer.java
new file mode 100644
index 0000000..28668e2
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/BrokerContainer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs bookkeeper.
+ */
+public class BrokerContainer extends PulsarContainer<BrokerContainer> {
+
+ public BrokerContainer(String clusterName, String hostName) {
+ super(
+ clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT,
INVALID_PORT);
+ }
+}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CSContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CSContainer.java
new file mode 100644
index 0000000..0f228ea
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CSContainer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs configuration store.
+ */
+public class CSContainer extends PulsarContainer<CSContainer> {
+
+ public static final String NAME = "configuration-store";
+
+ public CSContainer(String clusterName) {
+ super(
+ clusterName,
+ NAME,
+ NAME,
+ "bin/run-global-zk.sh",
+ CS_PORT,
+ INVALID_PORT);
+ }
+}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ChaosContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ChaosContainer.java
new file mode 100644
index 0000000..cd27960
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ChaosContainer.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.github.dockerjava.api.command.LogContainerCmd;
+import com.github.dockerjava.api.model.Frame;
+import com.github.dockerjava.core.command.LogContainerResultCallback;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.testcontainers.containers.GenericContainer;
+
+/**
+ * A base container provides chaos capability.
+ */
+@Slf4j
+public class ChaosContainer<SelfT extends ChaosContainer<SelfT>> extends
GenericContainer<SelfT> {
+
+ protected final String clusterName;
+
+ protected ChaosContainer(String clusterName, String image) {
+ super(image);
+ this.clusterName = clusterName;
+ }
+
+ public void tailContainerLog() {
+ CompletableFuture.runAsync(() -> {
+ while (null == containerId) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+
+ LogContainerCmd logContainerCmd =
this.dockerClient.logContainerCmd(containerId);
+
logContainerCmd.withStdOut(true).withStdErr(true).withFollowStream(true);
+ logContainerCmd.exec(new LogContainerResultCallback() {
+ @Override
+ public void onNext(Frame item) {
+ log.info(new String(item.getPayload(), UTF_8));
+ }
+ });
+ });
+ }
+
+ public String getContainerLog() {
+ StringBuilder sb = new StringBuilder();
+
+ LogContainerCmd logContainerCmd =
this.dockerClient.logContainerCmd(containerId);
+ logContainerCmd.withStdOut(true).withStdErr(true);
+ try {
+ logContainerCmd.exec(new LogContainerResultCallback() {
+ @Override
+ public void onNext(Frame item) {
+ sb.append(new String(item.getPayload(), UTF_8));
+ }
+ }).awaitCompletion();
+ } catch (InterruptedException e) {
+
+ }
+ return sb.toString();
+ }
+
+ public ExecResult execCmd(String... cmd) throws Exception {
+ String cmdString = StringUtils.join(cmd, " ");
+
+ log.info("DOCKER.exec({}:{}): Executing ...", containerId, cmdString);
+
+ ExecResult result = execInContainer(cmd);
+
+ log.info("Docker.exec({}:{}): Done", containerId, cmdString);
+ log.info("Docker.exec({}:{}): Stdout -\n{}", containerId, cmdString,
result.getStdout());
+ log.info("Docker.exec({}:{}): Stderr -\n{}", containerId, cmdString,
result.getStderr());
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof ChaosContainer)) {
+ return false;
+ }
+
+ ChaosContainer another = (ChaosContainer) o;
+ return clusterName.equals(another.clusterName)
+ && super.equals(another);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * super.hashCode() + Objects.hash(
+ clusterName);
+ }
+
+}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ProxyContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ProxyContainer.java
new file mode 100644
index 0000000..3290141
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ProxyContainer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs bookkeeper.
+ */
+public class ProxyContainer extends PulsarContainer<ProxyContainer> {
+
+ public ProxyContainer(String clusterName, String hostName) {
+ super(
+ clusterName, hostName, hostName, "bin/run-proxy.sh", BROKER_PORT,
BROKER_HTTP_PORT);
+ }
+
+ public String getPlainTextServiceUrl() {
+ return "pulsar://" + getContainerIpAddress() + ":" +
getMappedPort(BROKER_PORT);
+ }
+
+ public String getHttpServiceUrl() {
+ return "http://" + getContainerIpAddress() + ":" +
getMappedPort(BROKER_HTTP_PORT);
+ }
+}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
new file mode 100644
index 0000000..eb0ae84
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/PulsarContainer.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+import static java.time.temporal.ChronoUnit.SECONDS;
+
+import java.time.Duration;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+/**
+ * Abstract Test Container for Pulsar.
+ */
+@Slf4j
+public abstract class PulsarContainer<SelfT extends PulsarContainer<SelfT>>
extends ChaosContainer<SelfT> {
+
+ public static final int INVALID_PORT = -1;
+ public static final int ZK_PORT = 2181;
+ public static final int CS_PORT = 2184;
+ public static final int BOOKIE_PORT = 3181;
+ public static final int BROKER_PORT = 6650;
+ public static final int BROKER_HTTP_PORT = 8080;
+
+ private static final String IMAGE_NAME =
"apachepulsar/pulsar-test-latest-version:latest";
+
+ private final String hostname;
+ private final String serviceName;
+ private final String serviceEntrypoint;
+ private final int servicePort;
+ private final int httpPort;
+
+ public PulsarContainer(String clusterName,
+ String hostname,
+ String serviceName,
+ String serviceEntrypoint,
+ int servicePort,
+ int httpPort) {
+ super(clusterName, IMAGE_NAME);
+ this.hostname = hostname;
+ this.serviceName = serviceName;
+ this.serviceEntrypoint = serviceEntrypoint;
+ this.servicePort = servicePort;
+ this.httpPort = httpPort;
+ }
+
+ @Override
+ public String getContainerName() {
+ return clusterName + "-" + hostname;
+ }
+
+ @Override
+ protected void configure() {
+ if (httpPort > 0) {
+ addExposedPorts(
+ servicePort, httpPort
+ );
+ } else if (servicePort > 0) {
+ addExposedPort(servicePort);
+ }
+ }
+
+ @Override
+ public void start() {
+ if (httpPort > 0 || servicePort > 0) {
+ this.waitStrategy = new HostPortWaitStrategy()
+ .withStartupTimeout(Duration.of(60, SECONDS));
+ }
+ this.withCreateContainerCmdModifier(createContainerCmd -> {
+ createContainerCmd.withHostName(hostname);
+ createContainerCmd.withName(getContainerName());
+ createContainerCmd.withEntrypoint(serviceEntrypoint);
+ });
+
+ super.start();
+ log.info("Start pulsar service {} at container {}", serviceName,
containerName);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PulsarContainer)) {
+ return false;
+ }
+
+ PulsarContainer another = (PulsarContainer) o;
+ return containerName.equals(another.containerName)
+ && super.equals(another);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * super.hashCode() + Objects.hash(
+ containerName);
+ }
+}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ZKContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ZKContainer.java
new file mode 100644
index 0000000..c6524f9
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/ZKContainer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.containers;
+
+/**
+ * A pulsar container that runs zookeeper.
+ */
+public class ZKContainer extends PulsarContainer<ZKContainer> {
+
+ public static final String NAME = "zookeeper";
+
+ public ZKContainer(String clusterName) {
+ super(
+ clusterName,
+ NAME,
+ NAME,
+ "bin/run-local-zk.sh",
+ ZK_PORT,
+ INVALID_PORT);
+ }
+
+}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/package-info.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/package-info.java
new file mode 100644
index 0000000..2e0ad98
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Test containers used for running integration tests.
+ */
+package org.apache.pulsar.tests.containers;
\ No newline at end of file
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
new file mode 100644
index 0000000..a30509d
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.topologies;
+
+import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.containers.BKContainer;
+import org.apache.pulsar.tests.containers.BrokerContainer;
+import org.apache.pulsar.tests.containers.CSContainer;
+import org.apache.pulsar.tests.containers.ProxyContainer;
+import org.apache.pulsar.tests.containers.PulsarContainer;
+import org.apache.pulsar.tests.containers.ZKContainer;
+import org.testcontainers.containers.Network;
+
+/**
+ * Pulsar Cluster in containers.
+ */
+@Slf4j
+public class PulsarCluster {
+
+ /**
+ * Pulsar Cluster Spec.
+ *
+ * @param spec pulsar cluster spec.
+ * @return the built pulsar cluster
+ */
+ public static PulsarCluster forSpec(PulsarClusterSpec spec) {
+ return new PulsarCluster(spec);
+ }
+
+ private final PulsarClusterSpec spec;
+ @Getter
+ private final String clusterName;
+ private final Network network;
+ private final ZKContainer zkContainer;
+ private final CSContainer csContainer;
+ private final Map<String, BKContainer> bookieContainers;
+ private final Map<String, BrokerContainer> brokerContainers;
+ private final ProxyContainer proxyContainer;
+
+ private PulsarCluster(PulsarClusterSpec spec) {
+ this.spec = spec;
+ this.clusterName = spec.clusterName();
+ this.network = Network.newNetwork();
+ this.zkContainer = new ZKContainer(clusterName)
+ .withNetwork(network)
+ .withNetworkAliases(ZKContainer.NAME)
+ .withEnv("clusterName", clusterName)
+ .withEnv("zkServers", ZKContainer.NAME)
+ .withEnv("configurationStore", CSContainer.NAME + ":" + CS_PORT)
+ .withEnv("pulsarNode", "pulsar-broker-0");
+
+ this.csContainer = new CSContainer(clusterName)
+ .withNetwork(network)
+ .withNetworkAliases(CSContainer.NAME);
+ this.bookieContainers = Maps.newTreeMap();
+ this.brokerContainers = Maps.newTreeMap();
+ this.proxyContainer = new ProxyContainer(clusterName, "pulsar-proxy")
+ .withNetwork(network)
+ .withNetworkAliases("pulsar-proxy")
+ .withEnv("zookeeperServers", ZKContainer.NAME)
+ .withEnv("configurationStoreServers", CSContainer.NAME + ":" +
CS_PORT)
+ .withEnv("clusterName", clusterName);
+ }
+
+ public String getPlainTextServiceUrl() {
+ return proxyContainer.getPlainTextServiceUrl();
+ }
+
+ public String getHttpServiceUrl() {
+ return proxyContainer.getHttpServiceUrl();
+ }
+
+ public void start() throws Exception {
+ // start the local zookeeper
+ zkContainer.start();
+ log.info("Successfully started local zookeeper container.");
+
+ // start the configuration store
+ csContainer.start();
+ log.info("Successfully started configuration store container.");
+
+ // init the cluster
+ zkContainer.execCmd(
+ "bin/init-cluster.sh");
+ log.info("Successfully initialized the cluster.");
+
+ // create bookies
+ bookieContainers.putAll(
+ runNumContainers("bookie", spec.numBookies(), (name) -> new
BKContainer(clusterName, name)
+ .withNetwork(network)
+ .withNetworkAliases(name)
+ .withEnv("zkServers", ZKContainer.NAME)
+ .withEnv("useHostNameAsBookieID", "true")
+ .withEnv("clusterName", clusterName)
+ )
+ );
+
+ // create brokers
+ brokerContainers.putAll(
+ runNumContainers("broker", spec.numBrokers(), (name) -> new
BrokerContainer(clusterName, name)
+ .withNetwork(network)
+ .withNetworkAliases(name)
+ .withEnv("zookeeperServers", ZKContainer.NAME)
+ .withEnv("configurationStoreServers", CSContainer.NAME + ":" +
CS_PORT)
+ .withEnv("clusterName", clusterName)
+ .withEnv("brokerServiceCompactionMonitorIntervalInSeconds",
"1")
+ )
+ );
+
+ // create proxy
+ proxyContainer.start();
+ log.info("Successfully started pulsar proxy.");
+
+ log.info("Pulsar cluster {} is up running:", clusterName);
+ log.info("\tBinary Service Url : {}", getPlainTextServiceUrl());
+ log.info("\tHttp Service Url : {}", getHttpServiceUrl());
+ }
+
+ private static <T extends PulsarContainer> Map<String, T>
runNumContainers(String serviceName,
+
int numContainers,
+
Function<String, T> containerCreator) {
+ List<CompletableFuture<?>> startFutures = Lists.newArrayList();
+ Map<String, T> containers = Maps.newTreeMap();
+ for (int i = 0; i < numContainers; i++) {
+ String name = "pulsar-" + serviceName + "-" + i;
+ T container = containerCreator.apply(name);
+ containers.put(name, container);
+ startFutures.add(CompletableFuture.runAsync(() ->
container.start()));
+ }
+ CompletableFuture.allOf(startFutures.toArray(new
CompletableFuture[startFutures.size()])).join();
+ log.info("Successfully started {} {} containers", numContainers,
serviceName);
+ return containers;
+ }
+
+ public void stop() {
+ proxyContainer.stop();
+ brokerContainers.values().forEach(BrokerContainer::stop);
+ bookieContainers.values().forEach(BKContainer::stop);
+ csContainer.stop();
+ zkContainer.stop();
+ try {
+ network.close();
+ } catch (Exception e) {
+ log.info("Failed to shutdown network for pulsar cluster {}",
clusterName, e);
+ }
+ }
+}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
new file mode 100644
index 0000000..85c9f51
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterSpec.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.topologies;
+
+import lombok.Builder;
+import lombok.Builder.Default;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+/**
+ * Spec to build a pulsar cluster.
+ */
+@Builder
+@Accessors(fluent = true)
+@Getter
+@Setter
+public class PulsarClusterSpec {
+
+ /**
+ * Returns the cluster name.
+ *
+ * @return the cluster name.
+ */
+ String clusterName;
+
+ /**
+ * Returns number of bookies.
+ *
+ * @return number of bookies.
+ */
+ @Default
+ int numBookies = 3;
+
+ /**
+ * Returns number of brokers.
+ *
+ * @return number of brokers.
+ */
+ @Default
+ int numBrokers = 2;
+
+ /**
+ * Returns number of proxies.
+ *
+ * @return number of proxies.
+ */
+ @Default
+ int numProxies = 1;
+
+ /**
+ * Returns the flag whether to enable/disable container log.
+ *
+ * @return the flag whether to enable/disable container log.
+ */
+ boolean enableContainerLog = false;
+
+}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
new file mode 100644
index 0000000..bee31b8
--- /dev/null
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.topologies;
+
+import java.util.concurrent.ThreadLocalRandom;
+import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+@Slf4j
+public class PulsarClusterTestBase {
+
+ protected static PulsarCluster pulsarCluster;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 8; i++) {
+ sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
+ }
+ PulsarClusterSpec spec = PulsarClusterSpec.builder()
+ .clusterName(sb.toString())
+ .build();
+
+ setupCluster(spec);
+ }
+
+ protected static void setupCluster(PulsarClusterSpec spec) throws
Exception {
+ log.info("Setting up cluster {} with {} bookies, {} brokers",
+ spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+ pulsarCluster = PulsarCluster.forSpec(spec);
+ pulsarCluster.start();
+
+ log.info("Cluster {} is setup", spec.clusterName());
+ }
+
+ @AfterClass
+ public static void teardownCluster() {
+ if (null != pulsarCluster) {
+ pulsarCluster.stop();
+ }
+ }
+
+}
diff --git a/tests/integration-tests-utils/pom.xml
b/tests/integration-tests-utils/pom.xml
index 71deafa..2ebed73 100644
--- a/tests/integration-tests-utils/pom.xml
+++ b/tests/integration-tests-utils/pom.xml
@@ -37,16 +37,10 @@
<name>Apache Pulsar :: Tests :: Utility module for Arquillian based
integration tests</name>
- <properties>
- <arquillian-cube.version>1.15.1</arquillian-cube.version>
- <commons-compress.version>1.15</commons-compress.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
- <version>${commons-compress.version}</version>
</dependency>
<dependency>
@@ -77,7 +71,6 @@
<dependency>
<groupId>org.arquillian.cube</groupId>
<artifactId>arquillian-cube-docker</artifactId>
- <version>${arquillian-cube.version}</version>
</dependency>
</dependencies>
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index b84e398..6788b1b 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -37,5 +37,55 @@
<module>compaction</module>
<module>cli</module>
<module>s3-offload</module>
+ <module>semantics</module>
</modules>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <!-- only run tests when -DintegrationTests is specified //-->
+ <skipTests>true</skipTests>
+ <systemPropertyVariables>
+ <currentVersion>${project.version}</currentVersion>
+
<maven.buildDirectory>${project.build.directory}</maven.buildDirectory>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>integrationTests</id>
+ <activation>
+ <property>
+ <name>integrationTests</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <properties>
+ <property>
+ <name>listener</name>
+ <!-- AnnotationListener breaks arquillian, so don't use it
//-->
+ <value>org.apache.pulsar.tests.PulsarTestListener</value>
+ </property>
+ </properties>
+
+ <argLine>-Xmx2G -XX:MaxDirectMemorySize=8G
+ -Dio.netty.leakDetectionLevel=advanced
+ </argLine>
+ <skipTests>false</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/tests/integration-tests-topologies/pom.xml
b/tests/integration/semantics/pom.xml
similarity index 66%
copy from tests/integration-tests-topologies/pom.xml
copy to tests/integration/semantics/pom.xml
index 93fc1fe..5e8eeca 100644
--- a/tests/integration-tests-topologies/pom.xml
+++ b/tests/integration/semantics/pom.xml
@@ -27,14 +27,27 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pulsar.tests</groupId>
- <artifactId>tests-parent</artifactId>
+ <artifactId>integration</artifactId>
<version>2.2.0-incubating-SNAPSHOT</version>
</parent>
- <groupId>org.apache.pulsar.tests</groupId>
- <artifactId>integration-tests-topologies</artifactId>
+ <groupId>org.apache.pulsar.tests.integration</groupId>
+ <artifactId>semantics</artifactId>
<packaging>jar</packaging>
+ <name>Apache Pulsar :: Tests :: Integration Tests :: Semantics</name>
- <name>Apache Pulsar :: Tests :: Common topologies for Arquillian based
integration tests</name>
-
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar.tests</groupId>
+ <artifactId>integration-tests-topologies</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
new file mode 100644
index 0000000..a87f85a
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.semantics;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testng.annotations.Test;
+
+/**
+ * Test pulsar produce/consume semantics
+ */
+public class SemanticsTest extends PulsarClusterTestBase {
+
+ @Test
+ public void testPublishAndConsumePlainTextServiceUrl() throws Exception {
+ testPublishAndConsume(
+ pulsarCluster.getPlainTextServiceUrl(),
"test-publish-consume-plain-text");
+ }
+
+ private void testPublishAndConsume(String serviceUrl, String topicName)
throws Exception {
+
+ int numMessages = 10;
+
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(serviceUrl)
+ .build()) {
+
+ try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("my-sub")
+ .subscribe()) {
+
+ try (Producer<String> producer =
client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create()) {
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send("smoke-message-" + i);
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<String> m = consumer.receive();
+ assertEquals("smoke-message-" + i, m.getValue());
+ }
+ }
+ }
+ }
+
+
+}