sijie closed pull request #2356: [Integration Tests] Improve and fix
integration tests
URL: https://github.com/apache/incubator-pulsar/pull/2356
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/conf/pulsar_env.sh b/conf/pulsar_env.sh
index f600e6d20a..f21d1ddda8 100644
--- a/conf/pulsar_env.sh
+++ b/conf/pulsar_env.sh
@@ -42,7 +42,7 @@
# PULSAR_GLOBAL_ZK_CONF=
# Extra options to be passed to the jvm
-PULSAR_MEM=" -Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"
+PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"}
# Garbage collection options
PULSAR_GC=" -XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled
-XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis
-XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50
-XX:+DisableExplicitGC -XX:-ResizePLAB"
diff --git a/conf/pulsar_tools_env.sh b/conf/pulsar_tools_env.sh
index 7ad022ff43..38e2859c5a 100755
--- a/conf/pulsar_tools_env.sh
+++ b/conf/pulsar_tools_env.sh
@@ -42,7 +42,7 @@
# PULSAR_GLOBAL_ZK_CONF=
# Extra options to be passed to the jvm
-PULSAR_MEM=${PULSAR_MEM:-"-Xmx256m -XX:MaxDirectMemorySize=256m"}
+PULSAR_MEM=${PULSAR_TOOLS_MEM:-"-Xmx128m -XX:MaxDirectMemorySize=128m"}
# Garbage collection options
PULSAR_GC=" -client "
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 79b9093e7b..3e9a4267d3 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -26,7 +26,6 @@
import org.apache.pulsar.broker.web.AuthenticationFilter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -90,7 +89,6 @@ private void init() {
List<ServerConnector> connectors = new ArrayList<>();
ServerConnector connector = new ServerConnector(server, 1, 1);
connector.setPort(this.workerConfig.getWorkerPort());
- connector.setHost(this.workerConfig.getWorkerHostname());
connectors.add(connector);
List<Handler> handlers = new ArrayList<>(3);
@@ -114,7 +112,6 @@ private void init() {
this.workerConfig.isTlsRequireTrustedClientCertOnConnect());
ServerConnector tlsConnector = new ServerConnector(server, 1,
1, sslCtxFactory);
tlsConnector.setPort(this.workerConfig.getWorkerPortTls());
- tlsConnector.setHost(this.workerConfig.getWorkerHostname());
connectors.add(tlsConnector);
} catch (GeneralSecurityException e) {
throw new RuntimeException(e);
diff --git a/tests/docker-images/latest-version-image/conf/bookie.conf
b/tests/docker-images/latest-version-image/conf/bookie.conf
index 030d6adea4..b9dc405b82 100644
--- a/tests/docker-images/latest-version-image/conf/bookie.conf
+++ b/tests/docker-images/latest-version-image/conf/bookie.conf
@@ -22,5 +22,7 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/bookie.log
directory=/pulsar
-environment=PULSAR_MEM=-Xms128M
+environment=PULSAR_MEM=-Xmx128M
+environment=dbStorage_writeCacheMaxSizeMb=16
+environment=dbStorage_readAheadCacheMaxSizeMb=16
command=/pulsar/bin/pulsar bookie
diff --git a/tests/docker-images/latest-version-image/conf/broker.conf
b/tests/docker-images/latest-version-image/conf/broker.conf
index 5492abf89c..e5b0a03fa6 100644
--- a/tests/docker-images/latest-version-image/conf/broker.conf
+++ b/tests/docker-images/latest-version-image/conf/broker.conf
@@ -22,6 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/broker.log
directory=/pulsar
-environment=PULSAR_MEM=-Xms128M
+environment=PULSAR_MEM=-Xmx128M
command=/pulsar/bin/pulsar broker
diff --git a/tests/docker-images/latest-version-image/conf/global-zk.conf
b/tests/docker-images/latest-version-image/conf/global-zk.conf
index 5c6edaae70..d5556c7a2c 100644
--- a/tests/docker-images/latest-version-image/conf/global-zk.conf
+++ b/tests/docker-images/latest-version-image/conf/global-zk.conf
@@ -22,6 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/global-zk.log
directory=/pulsar
-environment=PULSAR_MEM=-Xms128M
+environment=PULSAR_MEM=-Xmx128M
command=/pulsar/bin/pulsar configuration-store
diff --git a/tests/docker-images/latest-version-image/conf/local-zk.conf
b/tests/docker-images/latest-version-image/conf/local-zk.conf
index 2822cb1b8c..c64dee323b 100644
--- a/tests/docker-images/latest-version-image/conf/local-zk.conf
+++ b/tests/docker-images/latest-version-image/conf/local-zk.conf
@@ -22,6 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/local-zk.log
directory=/pulsar
-environment=PULSAR_MEM=-Xms128M
+environment=PULSAR_MEM=-Xmx128M
command=/pulsar/bin/pulsar zookeeper
diff --git a/tests/docker-images/latest-version-image/conf/proxy.conf
b/tests/docker-images/latest-version-image/conf/proxy.conf
index b8127505b8..dc54dd9e03 100644
--- a/tests/docker-images/latest-version-image/conf/proxy.conf
+++ b/tests/docker-images/latest-version-image/conf/proxy.conf
@@ -22,6 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/proxy.log
directory=/pulsar
-environment=PULSAR_MEM=-Xms128M
+environment=PULSAR_MEM=-Xmx128M
command=/pulsar/bin/pulsar proxy
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index c1e61f6e28..a3a46944fc 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -131,6 +131,10 @@
-Dio.netty.leakDetectionLevel=advanced
</argLine>
<skipTests>false</skipTests>
+ <suiteXmlFiles>
+ <file>src/test/resources/pulsar.xml</file>
+ </suiteXmlFiles>
+ <forkCount>1</forkCount>
</configuration>
</plugin>
</plugins>
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index acdae263be..e30a1e3305 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -18,23 +18,21 @@
*/
package org.apache.pulsar.tests.integration.cli;
-import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.testng.annotations.Test;
/**
* Test Pulsar CLI.
*/
-public class CLITest extends PulsarClusterTestBase {
+public class CLITest extends PulsarTestSuite {
@Test
public void testDeprecatedCommands() throws Exception {
@@ -68,7 +66,7 @@ public void testCreateSubscriptionCommand() throws Exception {
for (BrokerContainer container : pulsarCluster.getBrokers()) {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
- "persistent",
+ "topics",
"create-subscription",
"persistent://public/default/" + topic,
"--subscription",
@@ -98,7 +96,7 @@ public void
testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Excep
// terminate the topic
result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
- "persistent",
+ "topics",
"terminate",
topicName);
assertTrue(result.getStdout().contains("Topic succesfully terminated
at"));
@@ -209,4 +207,5 @@ public void testSetInfiniteRetention() throws Exception {
result.getStdout().contains("\"retentionSizeInMB\" : -1"),
result.getStdout());
}
+
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
index 05d6235de6..fb76202e95 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
@@ -18,18 +18,21 @@
*/
package org.apache.pulsar.tests.integration.compaction;
+import static org.testng.Assert.assertEquals;
+
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-
-public class TestCompaction extends PulsarClusterTestBase {
+/**
+ * Test cases for compaction.
+ */
+public class TestCompaction extends PulsarTestSuite {
@Test(dataProvider = "ServiceUrls")
public void testPublishCompactAndConsumeCLI(String serviceUrl) throws
Exception {
@@ -45,29 +48,42 @@ public void testPublishCompactAndConsumeCLI(String
serviceUrl) throws Exception
try (PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build()) {
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
- try(Producer<byte[]> producer =
client.newProducer().topic(topic).create()) {
-
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
-
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+ try(Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic).create()) {
+ producer.newMessage()
+ .key("key0")
+ .value("content0")
+ .send();
+ producer.newMessage()
+ .key("key0")
+ .value("content1")
+ .send();
}
- try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
- .readCompacted(true).subscriptionName("sub1").subscribe())
{
- Message<byte[]> m = consumer.receive();
+ try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .readCompacted(true)
+ .subscriptionName("sub1")
+ .subscribe()) {
+ Message<String> m = consumer.receive();
assertEquals(m.getKey(), "key0");
- assertEquals(m.getData(), "content0".getBytes());
+ assertEquals(m.getValue(), "content0");
m = consumer.receive();
assertEquals(m.getKey(), "key0");
- assertEquals(m.getData(), "content1".getBytes());
+ assertEquals(m.getValue(), "content1");
}
pulsarCluster.runPulsarBaseCommandOnAnyBroker("compact-topic",
"-t", topic);
- try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
- .readCompacted(true).subscriptionName("sub1").subscribe())
{
- Message<byte[]> m = consumer.receive();
+ try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .readCompacted(true)
+ .subscriptionName("sub1")
+ .subscribe()) {
+ Message<String> m = consumer.receive();
assertEquals(m.getKey(), "key0");
- assertEquals(m.getData(), "content1".getBytes());
+ assertEquals(m.getValue(), "content1");
}
}
}
@@ -89,32 +105,38 @@ public void testPublishCompactAndConsumeRest(String
serviceUrl) throws Exception
try (PulsarClient client = PulsarClient.create(serviceUrl)) {
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
- try(Producer<byte[]> producer =
client.newProducer().topic(topic).create()) {
-
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
-
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+ try(Producer<String> producer =
client.newProducer(Schema.STRING).topic(topic).create()) {
+ producer.newMessage()
+ .key("key0")
+ .value("content0")
+ .send();
+ producer.newMessage()
+ .key("key0")
+ .value("content1")
+ .send();
}
- try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+ try (Consumer<String> consumer =
client.newConsumer(Schema.STRING).topic(topic)
.readCompacted(true).subscriptionName("sub1").subscribe())
{
- Message<byte[]> m = consumer.receive();
+ Message<String> m = consumer.receive();
assertEquals(m.getKey(), "key0");
- assertEquals(m.getData(), "content0".getBytes());
+ assertEquals(m.getValue(), "content0");
m = consumer.receive();
assertEquals(m.getKey(), "key0");
- assertEquals(m.getData(), "content1".getBytes());
+ assertEquals(m.getValue(), "content1");
}
- pulsarCluster.runAdminCommandOnAnyBroker("persistent",
+ pulsarCluster.runAdminCommandOnAnyBroker("topics",
"compact", topic);
- pulsarCluster.runAdminCommandOnAnyBroker("persistent",
+ pulsarCluster.runAdminCommandOnAnyBroker("topics",
"compaction-status", "-w", topic);
- try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+ try (Consumer<String> consumer =
client.newConsumer(Schema.STRING).topic(topic)
.readCompacted(true).subscriptionName("sub1").subscribe())
{
- Message<byte[]> m = consumer.receive();
+ Message<String> m = consumer.receive();
assertEquals(m.getKey(), "key0");
- assertEquals(m.getData(), "content1".getBytes());
+ assertEquals(m.getValue(), "content1");
}
}
}
@@ -122,21 +144,21 @@ public void testPublishCompactAndConsumeRest(String
serviceUrl) throws Exception
private static void waitAndVerifyCompacted(PulsarClient client, String
topic,
String sub, String expectedKey,
String expectedValue) throws Exception {
for (int i = 0; i < 60; i++) {
- try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+ try (Consumer<String> consumer =
client.newConsumer(Schema.STRING).topic(topic)
.readCompacted(true).subscriptionName(sub).subscribe()) {
- Message<byte[]> m = consumer.receive();
+ Message<String> m = consumer.receive();
assertEquals(m.getKey(), expectedKey);
- if (new String(m.getData()).equals(expectedValue)) {
+ if (m.getValue() == expectedValue) {
break;
}
}
Thread.sleep(1000);
}
- try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+ try (Consumer<String> consumer =
client.newConsumer(Schema.STRING).topic(topic)
.readCompacted(true).subscriptionName(sub).subscribe()) {
- Message<byte[]> m = consumer.receive();
+ Message<String> m = consumer.receive();
assertEquals(m.getKey(), expectedKey);
- assertEquals(new String(m.getData()), expectedValue);
+ assertEquals(m.getValue(), expectedValue);
}
}
@@ -154,18 +176,27 @@ public void testPublishWithAutoCompaction(String
serviceUrl) throws Exception {
pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
"set-compaction-threshold", "--threshold", "1", namespace);
- try (PulsarClient client = PulsarClient.create(serviceUrl)) {
-
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
-
- try(Producer<byte[]> producer =
client.newProducer().topic(topic).create()) {
-
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
-
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+ try (PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build()) {
+
client.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub1").subscribe().close();
+
+ try(Producer<String> producer =
client.newProducer(Schema.STRING).topic(topic).create()) {
+ producer.newMessage()
+ .key("key0")
+ .value("content0")
+ .send();
+ producer.newMessage()
+ .key("key0")
+ .value("content1")
+ .send();
}
waitAndVerifyCompacted(client, topic, "sub1", "key0", "content1");
- try(Producer<byte[]> producer =
client.newProducer().topic(topic).create()) {
-
producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build());
+ try(Producer<String> producer =
client.newProducer(Schema.STRING).topic(topic).create()) {
+ producer.newMessage()
+ .key("key0")
+ .value("content2")
+ .send();
}
waitAndVerifyCompacted(client, topic, "sub1", "key0", "content2");
}
@@ -191,4 +222,5 @@ private ContainerExecResult createNamespace(final String
Ns) throws Exception {
return result;
}
+
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 2971c59c28..ad77b6280f 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.utils.DockerUtils;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
/**
* Abstract Test Container for Pulsar.
@@ -46,6 +47,7 @@
private final String serviceEntryPoint;
private final int servicePort;
private final int httpPort;
+ private final String httpPath;
public PulsarContainer(String clusterName,
String hostname,
@@ -53,12 +55,23 @@ public PulsarContainer(String clusterName,
String serviceEntryPoint,
int servicePort,
int httpPort) {
+ this(clusterName, hostname, serviceName, serviceEntryPoint,
servicePort, httpPort, "/metrics");
+ }
+
+ public PulsarContainer(String clusterName,
+ String hostname,
+ String serviceName,
+ String serviceEntryPoint,
+ int servicePort,
+ int httpPort,
+ String httpPath) {
super(clusterName, IMAGE_NAME);
this.hostname = hostname;
this.serviceName = serviceName;
this.serviceEntryPoint = serviceEntryPoint;
this.servicePort = servicePort;
this.httpPort = httpPort;
+ this.httpPath = httpPath;
}
@Override
@@ -90,7 +103,13 @@ protected void configure() {
@Override
public void start() {
- if (httpPort > 0 || servicePort > 0) {
+ if (httpPort > 0 && servicePort < 0) {
+ this.waitStrategy = new HttpWaitStrategy()
+ .forPort(httpPort)
+ .forStatusCode(200)
+ .forPath(httpPath)
+ .withStartupTimeout(Duration.of(300, SECONDS));
+ } else if (httpPort > 0 || servicePort > 0) {
this.waitStrategy = new HostPortWaitStrategy()
.withStartupTimeout(Duration.of(300, SECONDS));
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
index 03e2053b00..dcc0999bac 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
@@ -27,6 +27,12 @@
public WorkerContainer(String clusterName, String hostname) {
super(
- clusterName, hostname, hostname, "bin/run-functions-worker.sh",
-1, BROKER_HTTP_PORT);
+ clusterName,
+ hostname,
+ hostname,
+ "bin/run-functions-worker.sh",
+ -1,
+ BROKER_HTTP_PORT,
+ "/admin/v2/functions/cluster");
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/FunctionsCLITest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/FunctionsCLITest.java
new file mode 100644
index 0000000000..4989f4fc13
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/FunctionsCLITest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.WorkerContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class FunctionsCLITest extends PulsarTestSuite {
+
+ //
+ // Tests on uploading/downloading function packages.
+ //
+
+ public String checkUpload() throws Exception {
+ String bkPkgPath = String.format("%s/%s/%s",
+ "tenant-" + randomName(8),
+ "ns-" + randomName(8),
+ "fn-" + randomName(8));
+
+ UploadDownloadCommandGenerator generator =
UploadDownloadCommandGenerator.createUploader(
+ PulsarCluster.ADMIN_SCRIPT,
+ bkPkgPath);
+ String actualCommand = generator.generateCommand();
+
+ log.info(actualCommand);
+
+ String[] commands = {
+ "sh", "-c", actualCommand
+ };
+ ContainerExecResult output =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertEquals(0, output.getExitCode());
+ assertTrue(output.getStdout().contains("\"Uploaded successfully\""));
+ return bkPkgPath;
+ }
+
+ @Test
+ public void checkDownload() throws Exception {
+ String bkPkgPath = checkUpload();
+ String localPkgFile = "/tmp/checkdownload-" + randomName(16);
+
+ UploadDownloadCommandGenerator generator =
UploadDownloadCommandGenerator.createDownloader(
+ localPkgFile,
+ bkPkgPath);
+ String actualCommand = generator.generateCommand();
+
+ log.info(actualCommand);
+
+ String[] commands = {
+ "sh", "-c", actualCommand
+ };
+ WorkerContainer container = pulsarCluster.getAnyWorker();
+ ContainerExecResult output = container.execCmd(commands);
+ assertEquals(0, output.getExitCode());
+ assertTrue(output.getStdout().contains("\"Downloaded successfully\""));
+ String[] diffCommand = {
+ "diff",
+ PulsarCluster.ADMIN_SCRIPT,
+ localPkgFile
+ };
+ output = container.execCmd(diffCommand);
+ assertEquals(0, output.getExitCode());
+ assertTrue(output.getStdout().isEmpty());
+ assertTrue(output.getStderr().isEmpty());
+ }
+
+
+
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsProcessTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsProcessTest.java
new file mode 100644
index 0000000000..ec36d528f5
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsProcessTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
+
+/**
+ * Process based test.
+ */
+public class PulsarFunctionsProcessTest extends PulsarFunctionsTest {
+ public PulsarFunctionsProcessTest() {
+ super(FunctionRuntimeType.PROCESS);
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index a541c092bd..57a6ba4dca 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -20,73 +20,607 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.base.Stopwatch;
+import com.google.gson.Gson;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.tests.integration.containers.WorkerContainer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
+import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
+import org.apache.pulsar.tests.integration.io.CassandraSinkTester;
+import org.apache.pulsar.tests.integration.io.KafkaSinkTester;
+import org.apache.pulsar.tests.integration.io.KafkaSourceTester;
+import org.apache.pulsar.tests.integration.io.SinkTester;
+import org.apache.pulsar.tests.integration.io.SourceTester;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.annotations.Test;
+/**
+ * A test base for testing sink.
+ */
@Slf4j
-public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
+public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
+
+ PulsarFunctionsTest(FunctionRuntimeType functionRuntimeType) {
+ super(functionRuntimeType);
+ }
+
+ @Test
+ public void testKafkaSink() throws Exception {
+ testSink(new KafkaSinkTester());
+ }
+
+ @Test
+ public void testCassandraSink() throws Exception {
+ testSink(new CassandraSinkTester());
+ }
+
+ private void testSink(SinkTester tester) throws Exception {
+ tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
+
+ final String tenant = TopicName.PUBLIC_TENANT;
+ final String namespace = TopicName.DEFAULT_NAMESPACE;
+ final String inputTopicName = "test-sink-connector-"
+ + functionRuntimeType + "-input-topic-" + randomName(8);
+ final String sinkName = "test-sink-connector-"
+ + functionRuntimeType + "-name-" + randomName(8);
+ final int numMessages = 20;
+
+ // prepare the testing environment for sink
+ prepareSink(tester);
+
+ // submit the sink connector
+ submitSinkConnector(tester, tenant, namespace, sinkName,
inputTopicName);
+
+ // get sink info
+ getSinkInfoSuccess(tester, tenant, namespace, sinkName);
+
+ // get sink status
+ getSinkStatus(tenant, namespace, sinkName);
+
+ // produce messages
+ Map<String, String> kvs = produceMessagesToInputTopic(inputTopicName,
numMessages);
+
+ // wait for sink to process messages
+ waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
+
+ // validate the sink result
+ tester.validateSinkResult(kvs);
+
+ // delete the sink
+ deleteSink(tenant, namespace, sinkName);
+
+ // get sink info (sink should be deleted)
+ getSinkInfoNotFound(tenant, namespace, sinkName);
+ }
+
+ protected void prepareSink(SinkTester tester) throws Exception {
+ tester.prepareSink();
+ }
+
+ protected void submitSinkConnector(SinkTester tester,
+ String tenant,
+ String namespace,
+ String sinkName,
+ String inputTopicName) throws Exception
{
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sink", "create",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName,
+ "--sink-type", tester.sinkType(),
+ "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+ "--inputs", inputTopicName
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Created successfully\""),
+ result.getStdout());
+ }
+
+ protected void getSinkInfoSuccess(SinkTester tester,
+ String tenant,
+ String namespace,
+ String sinkName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get sink info : {}", result.getStdout());
+ assertTrue(
+ result.getStdout().contains("\"builtin\": \"" +
tester.getSinkType() + "\""),
+ result.getStdout()
+ );
+ }
+
+ protected void getSinkStatus(String tenant, String namespace, String
sinkName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ while (true) {
+ try {
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get sink status : {}", result.getStdout());
+ if (result.getStdout().contains("\"running\": true")) {
+ return;
+ }
+ } catch (ContainerExecException e) {
+ // expected in early iterations
+ }
+ log.info("Backoff 1 second until the function is running");
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ protected Map<String, String> produceMessagesToInputTopic(String
inputTopicName,
+ int numMessages)
throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(inputTopicName)
+ .create();
+ LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+ for (int i = 0; i < numMessages; i++) {
+ String key = "key-" + i;
+ String value = "value-" + i;
+ kvs.put(key, value);
+ producer.newMessage()
+ .key(key)
+ .value(value)
+ .send();
+ }
+ return kvs;
+ }
+
+ protected void waitForProcessingMessages(String tenant,
+ String namespace,
+ String sinkName,
+ int numMessages) throws Exception
{
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ while (true) {
+ try {
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get sink status : {}", result.getStdout());
+ if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
+ return;
+ }
+ } catch (ContainerExecException e) {
+ // expected in early iterations
+ }
+
+ log.info("{} ms has elapsed but the sink hasn't process {}
messages, backoff to wait for another 1 second",
+ stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ protected void deleteSink(String tenant, String namespace, String
sinkName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sink",
+ "delete",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("Deleted successfully"),
+ result.getStdout()
+ );
+ assertTrue(
+ result.getStderr().isEmpty(),
+ result.getStderr()
+ );
+ }
+
+ protected void getSinkInfoNotFound(String tenant, String namespace, String
sinkName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ try {
+ pulsarCluster.getAnyWorker().execCmd(commands);
+ fail("Command should have exited with non-zero");
+ } catch (ContainerExecException e) {
+ assertTrue(e.getResult().getStderr().contains("Reason: Function "
+ sinkName + " doesn't exist"));
+ }
+ }
//
- // Tests on uploading/downloading function packages.
+ // Source Test
//
- public String checkUpload() throws Exception {
- String bkPkgPath = String.format("%s/%s/%s",
- "tenant-" + randomName(8),
- "ns-" + randomName(8),
- "fn-" + randomName(8));
+ @Test
+ public void testKafkaSource() throws Exception {
+ testSource(new KafkaSourceTester());
+ }
+
+ private void testSource(SourceTester tester) throws Exception {
+ tester.findSourceServiceContainer(pulsarCluster.getExternalServices());
+
+ final String tenant = TopicName.PUBLIC_TENANT;
+ final String namespace = TopicName.DEFAULT_NAMESPACE;
+ final String outputTopicName = "test-source-connector-"
+ + functionRuntimeType + "-output-topic-" + randomName(8);
+ final String sourceName = "test-source-connector-"
+ + functionRuntimeType + "-name-" + randomName(8);
+ final int numMessages = 20;
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+
+ @Cleanup
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(outputTopicName)
+ .subscriptionName("source-tester")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+
+ // prepare the testing environment for source
+ prepareSource(tester);
- UploadDownloadCommandGenerator generator =
UploadDownloadCommandGenerator.createUploader(
+ // submit the source connector
+ submitSourceConnector(tester, tenant, namespace, sourceName,
outputTopicName);
+
+ // get source info
+ getSourceInfoSuccess(tester, tenant, namespace, sourceName);
+
+ // get source status
+ getSourceStatus(tenant, namespace, sourceName);
+
+ // produce messages
+ Map<String, String> kvs = tester.produceSourceMessages(numMessages);
+
+ // wait for source to process messages
+ waitForProcessingSourceMessages(tenant, namespace, sourceName,
numMessages);
+
+ // validate the source result
+ validateSourceResult(consumer, kvs);
+
+ // delete the source
+ deleteSource(tenant, namespace, sourceName);
+
+ // get source info (source should be deleted)
+ getSourceInfoNotFound(tenant, namespace, sourceName);
+ }
+
+ protected void prepareSource(SourceTester tester) throws Exception {
+ tester.prepareSource();
+ }
+
+ protected void submitSourceConnector(SourceTester tester,
+ String tenant,
+ String namespace,
+ String sourceName,
+ String outputTopicName) throws
Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "source", "create",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName,
+ "--source-type", tester.sourceType(),
+ "--sourceConfig", new Gson().toJson(tester.sourceConfig()),
+ "--destinationTopicName", outputTopicName
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Created successfully\""),
+ result.getStdout());
+ }
+
+ protected void getSourceInfoSuccess(SourceTester tester,
+ String tenant,
+ String namespace,
+ String sourceName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source info : {}", result.getStdout());
+ assertTrue(
+ result.getStdout().contains("\"builtin\": \"" +
tester.getSourceType() + "\""),
+ result.getStdout()
+ );
+ }
+
+ protected void getSourceStatus(String tenant, String namespace, String
sourceName) throws Exception {
+ String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
- bkPkgPath);
- String actualCommand = generator.generateCommand();
+ "functions",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ while (true) {
+ try {
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source status : {}", result.getStdout());
+ if (result.getStdout().contains("\"running\": true")) {
+ return;
+ }
+ } catch (ContainerExecException e) {
+ // expected for early iterations
+ }
+ log.info("Backoff 1 second until the function is running");
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
- log.info(actualCommand);
+ protected void validateSourceResult(Consumer<String> consumer,
+ Map<String, String> kvs) throws
Exception {
+ for (Map.Entry<String, String> kv : kvs.entrySet()) {
+ Message<String> msg = consumer.receive();
+ assertEquals(kv.getKey(), msg.getKey());
+ assertEquals(kv.getValue(), msg.getValue());
+ }
+ }
+
+ protected void waitForProcessingSourceMessages(String tenant,
+ String namespace,
+ String sourceName,
+ int numMessages) throws
Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ while (true) {
+ try {
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get source status : {}", result.getStdout());
+ if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
+ return;
+ }
+ } catch (ContainerExecException e) {
+ // expected for early iterations
+ }
+ log.info("{} ms has elapsed but the source hasn't process {}
messages, backoff to wait for another 1 second",
+ stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ protected void deleteSource(String tenant, String namespace, String
sourceName) throws Exception {
String[] commands = {
- "sh", "-c", actualCommand
+ PulsarCluster.ADMIN_SCRIPT,
+ "source",
+ "delete",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
};
- ContainerExecResult output =
pulsarCluster.getAnyWorker().execCmd(commands);
- assertEquals(0, output.getExitCode());
- assertTrue(output.getStdout().contains("\"Uploaded successfully\""));
- return bkPkgPath;
+ ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("Delete source successfully"),
+ result.getStdout()
+ );
+ assertTrue(
+ result.getStderr().isEmpty(),
+ result.getStderr()
+ );
+ }
+
+ protected void getSourceInfoNotFound(String tenant, String namespace,
String sourceName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sourceName
+ };
+ try {
+ pulsarCluster.getAnyWorker().execCmd(commands);
+ fail("Command should have exited with non-zero");
+ } catch (ContainerExecException e) {
+ assertTrue(e.getResult().getStderr().contains("Reason: Function "
+ sourceName + " doesn't exist"));
+ }
+ }
+
+ //
+ // Test CRUD functions on different runtimes.
+ //
+
+ @Test
+ public void testPythonExclamationFunction() throws Exception {
+ testExclamationFunction(Runtime.PYTHON);
}
@Test
- public void checkDownload() throws Exception {
- String bkPkgPath = checkUpload();
- String localPkgFile = "/tmp/checkdownload-" + randomName(16);
+ public void testJavaExclamationFunction() throws Exception {
+ testExclamationFunction(Runtime.JAVA);
+ }
+
+ private void testExclamationFunction(Runtime runtime) throws Exception {
+ if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime ==
Runtime.PYTHON) {
+ // python can only run on process mode
+ return;
+ }
+
+ String inputTopicName = "test-exclamation-" + runtime + "-input-" +
randomName(8);
+ String outputTopicName = "test-exclamation-" + runtime + "-output-" +
randomName(8);
+ String functionName = "test-exclamation-fn-" + randomName(8);
+ final int numMessages = 10;
+
+ // submit the exclamation function
+ submitExclamationFunction(
+ runtime, inputTopicName, outputTopicName, functionName);
+
+ // get function info
+ getFunctionInfoSuccess(functionName);
+
+ // publish and consume result
+ publishAndConsumeMessages(inputTopicName, outputTopicName,
numMessages);
- UploadDownloadCommandGenerator generator =
UploadDownloadCommandGenerator.createDownloader(
- localPkgFile,
- bkPkgPath);
- String actualCommand = generator.generateCommand();
+ // get function status
+ getFunctionStatus(functionName, numMessages);
- log.info(actualCommand);
+ // delete function
+ deleteFunction(functionName);
+ // get function info
+ getFunctionInfoNotFound(functionName);
+ }
+
+ private static void submitExclamationFunction(Runtime runtime,
+ String inputTopicName,
+ String outputTopicName,
+ String functionName) throws
Exception {
+ CommandGenerator generator;
+ generator = CommandGenerator.createDefaultGenerator(inputTopicName,
getExclamationClass(runtime));
+ generator.setSinkTopic(outputTopicName);
+ generator.setFunctionName(functionName);
+ String command;
+ if (Runtime.JAVA == runtime) {
+ command = generator.generateCreateFunctionCommand();
+ } else if (Runtime.PYTHON == runtime) {
+ generator.setRuntime(runtime);
+ command =
generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE);
+ } else {
+ throw new IllegalArgumentException("Unsupported runtime : " +
runtime);
+ }
String[] commands = {
- "sh", "-c", actualCommand
+ "sh", "-c", command
};
- WorkerContainer container = pulsarCluster.getAnyWorker();
- ContainerExecResult output = container.execCmd(commands);
- assertEquals(0, output.getExitCode());
- assertTrue(output.getStdout().contains("\"Downloaded successfully\""));
- String[] diffCommand = {
- "diff",
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ commands);
+ assertTrue(result.getStdout().contains("\"Created successfully\""));
+ }
+
+ private static void getFunctionInfoSuccess(String functionName) throws
Exception {
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
- localPkgFile
- };
- output = container.execCmd(diffCommand);
- assertEquals(0, output.getExitCode());
- assertTrue(output.getStdout().isEmpty());
- assertTrue(output.getStderr().isEmpty());
+ "functions",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+ assertTrue(result.getStdout().contains("\"name\": \"" + functionName +
"\""));
}
+ private static void getFunctionInfoNotFound(String functionName) throws
Exception {
+ try {
+ pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName);
+ fail("Command should have exited with non-zero");
+ } catch (ContainerExecException e) {
+ assertTrue(e.getResult().getStderr().contains("Reason: Function "
+ functionName + " doesn't exist"));
+ }
+ }
+ private static void getFunctionStatus(String functionName, int
numMessages) throws Exception {
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+ assertTrue(result.getStdout().contains("\"running\": true"));
+ assertTrue(result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\""));
+ assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\":
\"" + numMessages + "\""));
+ }
+
+ private static void publishAndConsumeMessages(String inputTopic,
+ String outputTopic,
+ int numMessages) throws
Exception {
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(outputTopic)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub")
+ .subscribe();
+ @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(inputTopic)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send("message-" + i);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<String> msg = consumer.receive();
+ assertEquals("message-" + i + "!", msg.getValue());
+ }
+ }
+
+ private static void deleteFunction(String functionName) throws Exception {
+ ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "delete",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+ assertTrue(result.getStdout().contains("Deleted successfully"));
+ assertTrue(result.getStderr().isEmpty());
+ }
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 8e82662bad..fe4795d96e 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -20,16 +20,17 @@
import lombok.extern.slf4j.Slf4j;
import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
/**
* A cluster to run pulsar functions for testing functions related features.
*/
@Slf4j
-public abstract class PulsarFunctionsTestBase extends PulsarClusterTestBase {
+public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
@DataProvider(name = "FunctionRuntimeTypes")
public static Object[][] getData() {
@@ -49,12 +50,20 @@ protected PulsarFunctionsTestBase(FunctionRuntimeType
functionRuntimeType) {
this.functionRuntimeType = functionRuntimeType;
}
- protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
- String clusterName,
- PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
- return super.beforeSetupCluster(clusterName, specBuilder)
- .functionRuntimeType(functionRuntimeType)
- .numFunctionWorkers(2);
+ @BeforeClass
+ public void setupFunctionWorkers() {
+ final int numFunctionWorkers = 2;
+ log.info("Setting up {} function workers : function runtime type = {}",
+ numFunctionWorkers, functionRuntimeType);
+ pulsarCluster.setupFunctionWorkers(randomName(5), functionRuntimeType,
numFunctionWorkers);
+ log.info("{} function workers has started", numFunctionWorkers);
+ }
+
+ @AfterClass
+ public void teardownFunctionWorkers() {
+ log.info("Tearing down function workers ...");
+ pulsarCluster.stopWorkers();
+ log.info("All functions workers are stopped.");
}
//
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsThreadTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsThreadTest.java
new file mode 100644
index 0000000000..d1c132b3b3
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsThreadTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
+
+/**
+ * Thread based test.
+ */
+public class PulsarFunctionsThreadTest extends PulsarFunctionsTest {
+ public PulsarFunctionsThreadTest() {
+ super(FunctionRuntimeType.THREAD);
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
deleted file mode 100644
index a69d13745f..0000000000
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.functions.runtime;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import lombok.Cleanup;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
-import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
-import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
-import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.testcontainers.containers.Container.ExecResult;
-import org.testng.annotations.Factory;
-import org.testng.annotations.Test;
-
-/**
- * The tests that run over different container mode.
- */
-public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase {
-
- @Factory(dataProvider = "FunctionRuntimeTypes")
- PulsarFunctionsRuntimeTest(FunctionRuntimeType functionRuntimeType) {
- super(functionRuntimeType);
- }
-
- //
- // Test CRUD functions on different runtimes.
- //
-
- @Test(dataProvider = "FunctionRuntimes")
- public void testExclamationFunction(Runtime runtime) throws Exception {
- if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime ==
Runtime.PYTHON) {
- // python can only run on process mode
- return;
- }
-
- String inputTopicName = "test-exclamation-" + runtime + "-input-" +
randomName(8);
- String outputTopicName = "test-exclamation-" + runtime + "-output-" +
randomName(8);
- String functionName = "test-exclamation-fn-" + randomName(8);
- final int numMessages = 10;
-
- // submit the exclamation function
- submitExclamationFunction(
- runtime, inputTopicName, outputTopicName, functionName);
-
- // get function info
- getFunctionInfoSuccess(functionName);
-
- // publish and consume result
- publishAndConsumeMessages(inputTopicName, outputTopicName,
numMessages);
-
- // get function status
- getFunctionStatus(functionName, numMessages);
-
- // delete function
- deleteFunction(functionName);
-
- // get function info
- getFunctionInfoNotFound(functionName);
- }
-
- private static void submitExclamationFunction(Runtime runtime,
- String inputTopicName,
- String outputTopicName,
- String functionName) throws
Exception {
- CommandGenerator generator;
- generator = CommandGenerator.createDefaultGenerator(inputTopicName,
getExclamationClass(runtime));
- generator.setSinkTopic(outputTopicName);
- generator.setFunctionName(functionName);
- String command;
- if (Runtime.JAVA == runtime) {
- command = generator.generateCreateFunctionCommand();
- } else if (Runtime.PYTHON == runtime) {
- generator.setRuntime(runtime);
- command =
generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE);
- } else {
- throw new IllegalArgumentException("Unsupported runtime : " +
runtime);
- }
- String[] commands = {
- "sh", "-c", command
- };
- ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
- commands);
- assertTrue(result.getStdout().contains("\"Created successfully\""));
- }
-
- private static void getFunctionInfoSuccess(String functionName) throws
Exception {
- ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "get",
- "--tenant", "public",
- "--namespace", "default",
- "--name", functionName
- );
- assertTrue(result.getStdout().contains("\"name\": \"" + functionName +
"\""));
- }
-
- private static void getFunctionInfoNotFound(String functionName) throws
Exception {
- try {
- pulsarCluster.getAnyWorker().execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "get",
- "--tenant", "public",
- "--namespace", "default",
- "--name", functionName);
- fail("Command should have exited with non-zero");
- } catch (ContainerExecException e) {
- assertTrue(e.getResult().getStderr().contains("Reason: Function "
+ functionName + " doesn't exist"));
- }
- }
-
- private static void getFunctionStatus(String functionName, int
numMessages) throws Exception {
- ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "getstatus",
- "--tenant", "public",
- "--namespace", "default",
- "--name", functionName
- );
- assertTrue(result.getStdout().contains("\"running\": true"));
- assertTrue(result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\""));
- assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\":
\"" + numMessages + "\""));
- }
-
- private static void publishAndConsumeMessages(String inputTopic,
- String outputTopic,
- int numMessages) throws
Exception {
- @Cleanup PulsarClient client = PulsarClient.builder()
- .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
- .build();
- @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic(outputTopic)
- .subscriptionType(SubscriptionType.Exclusive)
- .subscriptionName("test-sub")
- .subscribe();
- @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
- .topic(inputTopic)
- .create();
-
- for (int i = 0; i < numMessages; i++) {
- producer.send("message-" + i);
- }
-
- for (int i = 0; i < numMessages; i++) {
- Message<String> msg = consumer.receive();
- assertEquals("message-" + i + "!", msg.getValue());
- }
- }
-
- private static void deleteFunction(String functionName) throws Exception {
- ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "delete",
- "--tenant", "public",
- "--namespace", "default",
- "--name", functionName
- );
- assertTrue(result.getStdout().contains("Deleted successfully"));
- assertTrue(result.getStderr().isEmpty());
- }
-
-}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
index 981ffbb225..e31de9f2ab 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -25,11 +25,11 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.CassandraContainer;
import org.testcontainers.containers.GenericContainer;
-import org.testng.collections.Maps;
import java.util.List;
import java.util.Map;
+import static com.google.common.base.Preconditions.checkState;
import static
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -40,6 +40,8 @@
@Slf4j
public class CassandraSinkTester extends SinkTester {
+ private static final String NAME = "cassandra";
+
private static final String ROOTS = "cassandra";
private static final String KEY = "key";
private static final String COLUMN = "col";
@@ -67,15 +69,16 @@ public CassandraSinkTester() {
}
@Override
- protected Map<String, GenericContainer<?>> newSinkService(String
clusterName) {
- this.cassandraCluster = new CassandraContainer(clusterName);
- Map<String, GenericContainer<?>> containers = Maps.newHashMap();
- containers.put(CassandraContainer.NAME, cassandraCluster);
- return containers;
+ public void findSinkServiceContainer(Map<String, GenericContainer<?>>
containers) {
+ GenericContainer<?> container = containers.get(NAME);
+ checkState(container instanceof CassandraContainer,
+ "No kafka service found in the cluster");
+
+ this.cassandraCluster = (CassandraContainer) container;
}
@Override
- protected void prepareSink() {
+ public void prepareSink() {
// build the sink
cluster = Cluster.builder()
.addContactPoint("localhost")
@@ -101,7 +104,7 @@ protected void prepareSink() {
}
@Override
- protected void validateSinkResult(Map<String, String> kvs) {
+ public void validateSinkResult(Map<String, String> kvs) {
String query = "SELECT * FROM " + tableName + ";";
ResultSet result = session.execute(query);
List<Row> rows = result.all();
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
index 2cad433d33..1cd58f26de 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.tests.integration.io;
+import static com.google.common.base.Preconditions.checkState;
import static
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -35,7 +36,6 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
-import org.testng.collections.Maps;
/**
* A tester for testing kafka sink.
@@ -64,21 +64,16 @@ public KafkaSinkTester() {
}
@Override
- protected Map<String, GenericContainer<?>> newSinkService(String
clusterName) {
- this.kafkaContainer = new KafkaContainer()
- .withEmbeddedZookeeper()
- .withNetworkAliases(NAME)
- .withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd
- .withName(NAME)
- .withHostName(clusterName + "-" + NAME));
+ public void findSinkServiceContainer(Map<String, GenericContainer<?>>
containers) {
+ GenericContainer<?> container = containers.get(NAME);
+ checkState(container instanceof KafkaContainer,
+ "No kafka service found in the cluster");
- Map<String, GenericContainer<?>> containers = Maps.newHashMap();
- containers.put("kafka", kafkaContainer);
- return containers;
+ this.kafkaContainer = (KafkaContainer) container;
}
@Override
- protected void prepareSink() throws Exception {
+ public void prepareSink() throws Exception {
ExecResult execResult = kafkaContainer.execInContainer(
"/usr/bin/kafka-topics",
"--create",
@@ -108,7 +103,7 @@ protected void prepareSink() throws Exception {
}
@Override
- protected void validateSinkResult(Map<String, String> kvs) {
+ public void validateSinkResult(Map<String, String> kvs) {
Iterator<Map.Entry<String, String>> kvIter = kvs.entrySet().iterator();
while (kvIter.hasNext()) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
index a23fba5dea..4928f00cfb 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.tests.integration.io;
+import static com.google.common.base.Preconditions.checkState;
import static
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
import static org.testng.Assert.assertTrue;
@@ -37,7 +38,6 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
-import org.testng.collections.Maps;
/**
* A tester for testing kafka source.
@@ -68,21 +68,16 @@ public KafkaSourceTester() {
}
@Override
- protected Map<String, GenericContainer<?>> newSourceService(String
clusterName) {
- this.kafkaContainer = new KafkaContainer()
- .withEmbeddedZookeeper()
- .withNetworkAliases(NAME)
- .withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd
- .withName(NAME)
- .withHostName(clusterName + "-" + NAME));
+ public void findSourceServiceContainer(Map<String, GenericContainer<?>>
containers) {
+ GenericContainer<?> container = containers.get(NAME);
+ checkState(container instanceof KafkaContainer,
+ "No kafka service found in the cluster");
- Map<String, GenericContainer<?>> containers = Maps.newHashMap();
- containers.put("kafka", kafkaContainer);
- return containers;
+ this.kafkaContainer = (KafkaContainer) container;
}
@Override
- protected void prepareSource() throws Exception {
+ public void prepareSource() throws Exception {
ExecResult execResult = kafkaContainer.execInContainer(
"/usr/bin/kafka-topics",
"--create",
@@ -112,7 +107,7 @@ protected void prepareSource() throws Exception {
}
@Override
- protected Map<String, String> produceSourceMessages(int numMessages)
throws Exception{
+ public Map<String, String> produceSourceMessages(int numMessages) throws
Exception{
KafkaProducer<String, String> producer = new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
deleted file mode 100644
index d1c80eaf2f..0000000000
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import com.google.common.base.Stopwatch;
-import com.google.gson.Gson;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
-import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
-import org.testcontainers.containers.GenericContainer;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Factory;
-import org.testng.annotations.Test;
-import org.testng.collections.Maps;
-
-/**
- * A test base for testing sink.
- */
-@Slf4j
-public class PulsarIOSinkTest extends PulsarFunctionsTestBase {
-
- @DataProvider(name = "Sinks")
- public static Object[][] getData() {
- return new Object[][] {
- { FunctionRuntimeType.PROCESS, new CassandraSinkTester() },
- { FunctionRuntimeType.THREAD, new CassandraSinkTester() },
- { FunctionRuntimeType.PROCESS, new KafkaSinkTester() },
- { FunctionRuntimeType.THREAD, new KafkaSinkTester() }
- };
- }
-
- protected final SinkTester tester;
-
- @Factory(dataProvider = "Sinks")
- PulsarIOSinkTest(FunctionRuntimeType functionRuntimeType, SinkTester
tester) {
- super(functionRuntimeType);
- this.tester = tester;
- }
-
- @Override
- protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
-
PulsarClusterSpecBuilder specBuilder) {
- Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
- externalServices.putAll(tester.newSinkService(clusterName));
- return super.beforeSetupCluster(clusterName, specBuilder)
- .externalServices(externalServices);
- }
-
- @Test
- public void testSink() throws Exception {
- final String tenant = TopicName.PUBLIC_TENANT;
- final String namespace = TopicName.DEFAULT_NAMESPACE;
- final String inputTopicName = "test-sink-connector-input-topic-" +
randomName(8);
- final String sinkName = "test-sink-connector-name-" + randomName(8);
- final int numMessages = 20;
-
- // prepare the testing environment for sink
- prepareSink();
-
- // submit the sink connector
- submitSinkConnector(tenant, namespace, sinkName, inputTopicName);
-
- // get sink info
- getSinkInfoSuccess(tenant, namespace, sinkName);
-
- // get sink status
- getSinkStatus(tenant, namespace, sinkName);
-
- // produce messages
- Map<String, String> kvs = produceMessagesToInputTopic(inputTopicName,
numMessages);
-
- // wait for sink to process messages
- waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
-
- // validate the sink result
- tester.validateSinkResult(kvs);
-
- // delete the sink
- deleteSink(tenant, namespace, sinkName);
-
- // get sink info (sink should be deleted)
- getSinkInfoNotFound(tenant, namespace, sinkName);
- }
-
- protected void prepareSink() throws Exception {
- tester.prepareSink();
- }
-
- protected void submitSinkConnector(String tenant,
- String namespace,
- String sinkName,
- String inputTopicName) throws Exception
{
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "sink", "create",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sinkName,
- "--sink-type", tester.sinkType(),
- "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
- "--inputs", inputTopicName
- };
- log.info("Run command : {}", StringUtils.join(commands, ' '));
- ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
- assertTrue(
- result.getStdout().contains("\"Created successfully\""),
- result.getStdout());
- }
-
- protected void getSinkInfoSuccess(String tenant, String namespace, String
sinkName) throws Exception {
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "get",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sinkName
- };
- ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
- log.info("Get sink info : {}", result.getStdout());
- assertTrue(
- result.getStdout().contains("\"builtin\": \"" + tester.sinkType +
"\""),
- result.getStdout()
- );
- }
-
- protected void getSinkStatus(String tenant, String namespace, String
sinkName) throws Exception {
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "getstatus",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sinkName
- };
- while (true) {
- try {
- ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
- log.info("Get sink status : {}", result.getStdout());
- if (result.getStdout().contains("\"running\": true")) {
- return;
- }
- } catch (ContainerExecException e) {
- // expected in early iterations
- }
- log.info("Backoff 1 second until the function is running");
- TimeUnit.SECONDS.sleep(1);
- }
- }
-
- protected Map<String, String> produceMessagesToInputTopic(String
inputTopicName,
- int numMessages)
throws Exception {
- @Cleanup
- PulsarClient client = PulsarClient.builder()
- .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
- .build();
- @Cleanup
- Producer<String> producer = client.newProducer(Schema.STRING)
- .topic(inputTopicName)
- .create();
- LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
- for (int i = 0; i < numMessages; i++) {
- String key = "key-" + i;
- String value = "value-" + i;
- kvs.put(key, value);
- producer.newMessage()
- .key(key)
- .value(value)
- .send();
- }
- return kvs;
- }
-
- protected void waitForProcessingMessages(String tenant,
- String namespace,
- String sinkName,
- int numMessages) throws Exception
{
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "getstatus",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sinkName
- };
- Stopwatch stopwatch = Stopwatch.createStarted();
- while (true) {
- try {
- ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
- log.info("Get sink status : {}", result.getStdout());
- if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
- return;
- }
- } catch (ContainerExecException e) {
- // expected in early iterations
- }
-
- log.info("{} ms has elapsed but the sink hasn't process {}
messages, backoff to wait for another 1 second",
- stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
- TimeUnit.SECONDS.sleep(1);
- }
- }
-
- protected void deleteSink(String tenant, String namespace, String
sinkName) throws Exception {
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "sink",
- "delete",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sinkName
- };
- ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
- assertTrue(
- result.getStdout().contains("Deleted successfully"),
- result.getStdout()
- );
- assertTrue(
- result.getStderr().isEmpty(),
- result.getStderr()
- );
- }
-
- protected void getSinkInfoNotFound(String tenant, String namespace, String
sinkName) throws Exception {
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "get",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sinkName
- };
- try {
- pulsarCluster.getAnyWorker().execCmd(commands);
- fail("Command should have exited with non-zero");
- } catch (ContainerExecException e) {
- assertTrue(e.getResult().getStderr().contains("Reason: Function "
+ sinkName + " doesn't exist"));
- }
- }
-}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
deleted file mode 100644
index 8bd76ef826..0000000000
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import com.google.common.base.Stopwatch;
-import com.google.gson.Gson;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
-import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
-import org.testcontainers.containers.GenericContainer;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Factory;
-import org.testng.annotations.Test;
-import org.testng.collections.Maps;
-
-/**
- * A test base for testing source.
- */
-@Slf4j
-public class PulsarIOSourceTest extends PulsarFunctionsTestBase {
-
- @DataProvider(name = "Sources")
- public static Object[][] getData() {
- return new Object[][] {
- { FunctionRuntimeType.PROCESS, new KafkaSourceTester() },
- { FunctionRuntimeType.THREAD, new KafkaSourceTester() }
- };
- }
-
- protected final SourceTester tester;
-
- @Factory(dataProvider = "Sources")
- PulsarIOSourceTest(FunctionRuntimeType functionRuntimeType, SourceTester
tester) {
- super(functionRuntimeType);
- this.tester = tester;
- }
-
- @Override
- protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
-
PulsarClusterSpecBuilder specBuilder) {
- Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
- externalServices.putAll(tester.newSourceService(clusterName));
- return super.beforeSetupCluster(clusterName, specBuilder)
- .externalServices(externalServices);
- }
-
- @Test
- public void testSource() throws Exception {
- final String tenant = TopicName.PUBLIC_TENANT;
- final String namespace = TopicName.DEFAULT_NAMESPACE;
- final String outputTopicName = "test-source-connector-output-topic-" +
randomName(8);
- final String sourceName = "test-source-connector-name-" +
randomName(8);
- final int numMessages = 20;
-
- @Cleanup
- PulsarClient client = PulsarClient.builder()
- .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
- .build();
-
- @Cleanup
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic(outputTopicName)
- .subscriptionName("source-tester")
- .subscriptionType(SubscriptionType.Exclusive)
- .subscribe();
-
- // prepare the testing environment for source
- prepareSource();
-
- // submit the source connector
- submitSourceConnector(tenant, namespace, sourceName, outputTopicName);
-
- // get source info
- getSourceInfoSuccess(tenant, namespace, sourceName);
-
- // get source status
- getSourceStatus(tenant, namespace, sourceName);
-
- // produce messages
- Map<String, String> kvs = tester.produceSourceMessages(numMessages);
-
- // wait for source to process messages
- waitForProcessingMessages(tenant, namespace, sourceName, numMessages);
-
- // validate the source result
- validateSourceResult(consumer, kvs);
-
- // delete the source
- deleteSource(tenant, namespace, sourceName);
-
- // get source info (source should be deleted)
- getSourceInfoNotFound(tenant, namespace, sourceName);
- }
-
- protected void prepareSource() throws Exception {
- tester.prepareSource();
- }
-
- protected void submitSourceConnector(String tenant,
- String namespace,
- String sourceName,
- String outputTopicName) throws
Exception {
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "source", "create",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sourceName,
- "--source-type", tester.sourceType(),
- "--sourceConfig", new Gson().toJson(tester.sourceConfig()),
- "--destinationTopicName", outputTopicName
- };
- log.info("Run command : {}", StringUtils.join(commands, ' '));
- ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
- assertTrue(
- result.getStdout().contains("\"Created successfully\""),
- result.getStdout());
- }
-
- protected void getSourceInfoSuccess(String tenant, String namespace,
String sourceName) throws Exception {
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "get",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sourceName
- };
- ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
- log.info("Get source info : {}", result.getStdout());
- assertTrue(
- result.getStdout().contains("\"builtin\": \"" + tester.sourceType
+ "\""),
- result.getStdout()
- );
- }
-
- protected void getSourceStatus(String tenant, String namespace, String
sourceName) throws Exception {
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "getstatus",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sourceName
- };
- while (true) {
- try {
- ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
- log.info("Get source status : {}", result.getStdout());
- if (result.getStdout().contains("\"running\": true")) {
- return;
- }
- } catch (ContainerExecException e) {
- // expected for early iterations
- }
- log.info("Backoff 1 second until the function is running");
- TimeUnit.SECONDS.sleep(1);
- }
- }
-
- protected void validateSourceResult(Consumer<String> consumer,
- Map<String, String> kvs) throws
Exception {
- for (Map.Entry<String, String> kv : kvs.entrySet()) {
- Message<String> msg = consumer.receive();
- assertEquals(kv.getKey(), msg.getKey());
- assertEquals(kv.getValue(), msg.getValue());
- }
- }
-
- protected void waitForProcessingMessages(String tenant,
- String namespace,
- String sourceName,
- int numMessages) throws Exception
{
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "getstatus",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sourceName
- };
- Stopwatch stopwatch = Stopwatch.createStarted();
- while (true) {
- try {
- ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
- log.info("Get source status : {}", result.getStdout());
- if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
- return;
- }
- } catch (ContainerExecException e) {
- // expected for early iterations
- }
- log.info("{} ms has elapsed but the source hasn't process {}
messages, backoff to wait for another 1 second",
- stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
- TimeUnit.SECONDS.sleep(1);
- }
- }
-
- protected void deleteSource(String tenant, String namespace, String
sourceName) throws Exception {
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "source",
- "delete",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sourceName
- };
- ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
- assertTrue(
- result.getStdout().contains("Delete source successfully"),
- result.getStdout()
- );
- assertTrue(
- result.getStderr().isEmpty(),
- result.getStderr()
- );
- }
-
- protected void getSourceInfoNotFound(String tenant, String namespace,
String sourceName) throws Exception {
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "get",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sourceName
- };
- try {
- pulsarCluster.getAnyWorker().execCmd(commands);
- fail("Command should have exited with non-zero");
- } catch (ContainerExecException e) {
- assertTrue(e.getResult().getStderr().contains("Reason: Function "
+ sourceName + " doesn't exist"));
- }
- }
-}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index 75434be529..3eee8219fa 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -19,34 +19,36 @@
package org.apache.pulsar.tests.integration.io;
import java.util.Map;
+import lombok.Getter;
import org.testcontainers.containers.GenericContainer;
import org.testng.collections.Maps;
/**
* A tester used for testing a specific sink.
*/
+@Getter
public abstract class SinkTester {
protected final String sinkType;
protected final Map<String, Object> sinkConfig;
- protected SinkTester(String sinkType) {
+ public SinkTester(String sinkType) {
this.sinkType = sinkType;
this.sinkConfig = Maps.newHashMap();
}
- protected abstract Map<String, GenericContainer<?>> newSinkService(String
clusterName);
+ public abstract void findSinkServiceContainer(Map<String,
GenericContainer<?>> externalServices);
- protected String sinkType() {
+ public String sinkType() {
return sinkType;
}
- protected Map<String, Object> sinkConfig() {
+ public Map<String, Object> sinkConfig() {
return sinkConfig;
}
- protected abstract void prepareSink() throws Exception;
+ public abstract void prepareSink() throws Exception;
- protected abstract void validateSinkResult(Map<String, String> kvs);
+ public abstract void validateSinkResult(Map<String, String> kvs);
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
index a28d0861d2..dc58f2ffbb 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
@@ -19,12 +19,14 @@
package org.apache.pulsar.tests.integration.io;
import java.util.Map;
+import lombok.Getter;
import org.testcontainers.containers.GenericContainer;
import org.testng.collections.Maps;
/**
* A tester used for testing a specific source.
*/
+@Getter
public abstract class SourceTester {
protected final String sourceType;
@@ -35,18 +37,18 @@ protected SourceTester(String sourceType) {
this.sourceConfig = Maps.newHashMap();
}
- protected abstract Map<String, GenericContainer<?>>
newSourceService(String clusterName);
+ public abstract void findSourceServiceContainer(Map<String,
GenericContainer<?>> externalServices);
- protected String sourceType() {
+ public String sourceType() {
return sourceType;
}
- protected Map<String, Object> sourceConfig() {
+ public Map<String, Object> sourceConfig() {
return sourceConfig;
}
- protected abstract void prepareSource() throws Exception;
+ public abstract void prepareSource() throws Exception;
- protected abstract Map<String, String> produceSourceMessages(int
numMessages) throws Exception;
+ public abstract Map<String, String> produceSourceMessages(int numMessages)
throws Exception;
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
index dd1cb7ae84..1bae448723 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
@@ -20,9 +20,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -36,57 +34,18 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
-
-import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.S3Container;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static java.util.stream.Collectors.joining;
-
@Slf4j
-public class TestS3Offload extends PulsarClusterTestBase {
+public class TestS3Offload extends PulsarTieredStorageTestSuite {
private static final int ENTRY_SIZE = 1024;
- private static final int ENTRIES_PER_LEDGER = 1024;
-
- @Override
- @BeforeClass
- public void setupCluster() throws Exception {
-
- final String clusterName = Stream.of(this.getClass().getSimpleName(),
randomName(5))
- .filter(s -> s != null && !s.isEmpty())
- .collect(joining("-"));
-
- PulsarClusterSpec spec = PulsarClusterSpec.builder()
- .numBookies(2)
- .numBrokers(1)
- .externalServices(ImmutableMap.of(S3Container.NAME, new
S3Container(clusterName, S3Container.NAME)))
- .clusterName(clusterName)
- .build();
-
- log.info("Setting up cluster {} with {} bookies, {} brokers",
- spec.clusterName(), spec.numBookies(), spec.numBrokers());
-
- pulsarCluster = PulsarCluster.forSpec(spec);
-
- for(BrokerContainer brokerContainer : pulsarCluster.getBrokers()){
- brokerContainer.withEnv("managedLedgerMaxEntriesPerLedger",
String.valueOf(ENTRIES_PER_LEDGER));
-
brokerContainer.withEnv("managedLedgerMinLedgerRolloverTimeMinutes", "0");
- brokerContainer.withEnv("managedLedgerOffloadDriver", "s3");
- brokerContainer.withEnv("s3ManagedLedgerOffloadBucket",
"pulsar-integtest");
- brokerContainer.withEnv("s3ManagedLedgerOffloadServiceEndpoint",
"http://" + S3Container.NAME + ":9090");
- }
-
- pulsarCluster.start();
-
- log.info("Cluster {} is setup", spec.clusterName());
- }
private static byte[] buildEntry(String pattern) {
byte[] entry = new byte[ENTRY_SIZE];
@@ -98,6 +57,25 @@ public void setupCluster() throws Exception {
return entry;
}
+ private S3Container s3Container;
+
+ @BeforeClass
+ public void setupS3() {
+ s3Container = new S3Container(
+ pulsarCluster.getClusterName(),
+ S3Container.NAME)
+ .withNetwork(pulsarCluster.getNetwork())
+ .withNetworkAliases(S3Container.NAME);
+ s3Container.start();
+ }
+
+ @AfterClass
+ public void teardownS3() {
+ if (null != s3Container) {
+ s3Container.stop();
+ }
+ }
+
@Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String
adminUrl) throws Exception {
final String tenant = "s3-offload-test-cli-" + randomName(4);
@@ -350,4 +328,5 @@ public void testPublishOffloadAndConsumeDeletionLag(String
serviceUrl, String ad
Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
}
+
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
deleted file mode 100644
index b22bd7476b..0000000000
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.smoke;
-
-import static org.testng.Assert.assertEquals;
-
-import lombok.Cleanup;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testng.annotations.Test;
-
-import java.util.concurrent.TimeUnit;
-
-public class SmokeTest extends PulsarClusterTestBase {
-
- private final static String clusterName = "test";
-
- @Test(dataProvider = "ServiceUrls")
- public void testPublishAndConsume(String serviceUrl) throws Exception {
-
- this.createTenantName("smoke-test", clusterName, "smoke-admin");
- pulsarCluster.createNamespace(clusterName);
- String topic = "persistent://smoke-test/test/ns1/topic1";
-
- @Cleanup
- PulsarClient client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .build();
-
- @Cleanup
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic(topic)
- .subscriptionName("test-sub")
- .ackTimeout(10, TimeUnit.SECONDS)
- .subscriptionType(SubscriptionType.Exclusive)
- .subscribe();
-
- @Cleanup
- Producer<String> producer = client.newProducer(Schema.STRING)
- .topic(topic)
- .enableBatching(false)
- .producerName("effectively-once-producer")
- .initialSequenceId(1L)
- .create();
-
- for (int i = 0; i < 10; i++) {
- producer.send(("smoke-message" + i));
- }
- for (int i = 0; i < 10; i++) {
- Message m = consumer.receive();
- assertEquals("smoke-message" + i, new String(m.getData()));
- }
- }
-
- private ContainerExecResult createTenantName(String tenantName,
- String clusterName,
- String roleName) throws
Exception {
- ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
- "tenants", "create", tenantName, "--allowed-clusters", clusterName,
- "--admin-roles", roleName);
- assertEquals(0, result.getExitCode());
- return result;
- }
-}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
new file mode 100644
index 0000000000..e20a933e4d
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.suites;
+
+import java.util.Map;
+import org.apache.pulsar.tests.integration.containers.CassandraContainer;
+import
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testng.ITest;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.collections.Maps;
+
+public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
+
+ @BeforeSuite
+ @Override
+ public void setupCluster() throws Exception {
+ super.setupCluster();
+ }
+
+ @AfterSuite
+ @Override
+ public void tearDownCluster() {
+ super.tearDownCluster();
+ }
+
+ @Override
+ protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
PulsarClusterSpecBuilder specBuilder) {
+ PulsarClusterSpecBuilder builder =
super.beforeSetupCluster(clusterName, specBuilder);
+
+ // start functions
+
+ // register external services
+ Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+ final String kafkaServiceName = "kafka";
+ externalServices.put(
+ kafkaServiceName,
+ new KafkaContainer()
+ .withEmbeddedZookeeper()
+ .withNetworkAliases(kafkaServiceName)
+ .withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd
+ .withName(kafkaServiceName)
+ .withHostName(clusterName + "-" + kafkaServiceName)));
+ final String cassandraServiceName = "cassandra";
+ externalServices.put(
+ cassandraServiceName,
+ new CassandraContainer(clusterName));
+ builder = builder.externalServices(externalServices);
+
+ return builder;
+ }
+
+ @Override
+ public String getTestName() {
+ return "pulsar-test-suite";
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
new file mode 100644
index 0000000000..d248dce1c9
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.suites;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.stream.Stream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.S3Container;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testng.ITest;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+
+@Slf4j
+public class PulsarTieredStorageTestSuite extends PulsarClusterTestBase
implements ITest {
+
+ protected static final int ENTRIES_PER_LEDGER = 1024;
+
+ @BeforeSuite
+ @Override
+ public void setupCluster() throws Exception {
+ final String clusterName = Stream.of(this.getClass().getSimpleName(),
randomName(5))
+ .filter(s -> s != null && !s.isEmpty())
+ .collect(joining("-"));
+
+ PulsarClusterSpec spec = PulsarClusterSpec.builder()
+ .numBookies(2)
+ .numBrokers(1)
+ .clusterName(clusterName)
+ .build();
+
+ log.info("Setting up cluster {} with {} bookies, {} brokers",
+ spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+ pulsarCluster = PulsarCluster.forSpec(spec);
+
+ for(BrokerContainer brokerContainer : pulsarCluster.getBrokers()){
+ brokerContainer.withEnv("managedLedgerMaxEntriesPerLedger",
String.valueOf(ENTRIES_PER_LEDGER));
+
brokerContainer.withEnv("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+ brokerContainer.withEnv("managedLedgerOffloadDriver", "s3");
+ brokerContainer.withEnv("s3ManagedLedgerOffloadBucket",
"pulsar-integtest");
+ brokerContainer.withEnv("s3ManagedLedgerOffloadServiceEndpoint",
"http://" + S3Container.NAME + ":9090");
+ }
+
+ pulsarCluster.start();
+
+ log.info("Cluster {} is setup", spec.clusterName());
+ }
+
+ @AfterSuite
+ @Override
+ public void tearDownCluster() {
+ super.tearDownCluster();
+ }
+
+ @Override
+ public String getTestName() {
+ return "tiered-storage-test-suite";
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 2f1653a9b4..8ff1e3231d 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -29,7 +29,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Stream;
@@ -78,6 +77,7 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) {
private final Map<String, BrokerContainer> brokerContainers;
private final Map<String, WorkerContainer> workerContainers;
private final ProxyContainer proxyContainer;
+ private Map<String, GenericContainer<?>> externalServices =
Collections.emptyMap();
private PulsarCluster(PulsarClusterSpec spec) {
@@ -148,6 +148,14 @@ public String getZKConnString() {
return zkContainer.getContainerIpAddress() + ":" +
zkContainer.getMappedPort(ZK_PORT);
}
+ public Network getNetwork() {
+ return network;
+ }
+
+ public Map<String, GenericContainer<?>> getExternalServices() {
+ return externalServices;
+ }
+
public void start() throws Exception {
// start the local zookeeper
zkContainer.start();
@@ -178,20 +186,8 @@ public void start() throws Exception {
log.info("\tBinary Service Url : {}", getPlainTextServiceUrl());
log.info("\tHttp Service Url : {}", getHttpServiceUrl());
- // start function workers
- if (spec.numFunctionWorkers() > 0) {
- switch (spec.functionRuntimeType()) {
- case THREAD:
-
startFunctionWorkersWithThreadContainerFactory(spec.numFunctionWorkers());
- break;
- case PROCESS:
-
startFunctionWorkersWithProcessContainerFactory(spec.numFunctionWorkers());
- break;
- }
- }
-
// start external services
- final Map<String, GenericContainer<?>> externalServices =
spec.externalServices;
+ this.externalServices = spec.externalServices;
if (null != externalServices) {
externalServices.entrySet().parallelStream().forEach(service -> {
GenericContainer<?> serviceContainer = service.getValue();
@@ -206,7 +202,6 @@ public void start() throws Exception {
private static <T extends PulsarContainer> Map<String, T>
runNumContainers(String serviceName,
int numContainers,
Function<String, T> containerCreator) {
- List<CompletableFuture<?>> startFutures = Lists.newArrayList();
Map<String, T> containers = Maps.newTreeMap();
for (int i = 0; i < numContainers; i++) {
String name = "pulsar-" + serviceName + "-" + i;
@@ -216,8 +211,7 @@ public void start() throws Exception {
return containers;
}
- public void stop() {
-
+ public synchronized void stop() {
Stream<GenericContainer> containers = Streams.concat(
workerContainers.values().stream(),
brokerContainers.values().stream(),
@@ -238,11 +232,22 @@ public void stop() {
}
}
- private void startFunctionWorkersWithProcessContainerFactory(int
numFunctionWorkers) {
+ public synchronized void setupFunctionWorkers(String suffix,
FunctionRuntimeType runtimeType, int numFunctionWorkers) {
+ switch (runtimeType) {
+ case THREAD:
+ startFunctionWorkersWithThreadContainerFactory(suffix,
numFunctionWorkers);
+ break;
+ case PROCESS:
+ startFunctionWorkersWithProcessContainerFactory(suffix,
numFunctionWorkers);
+ break;
+ }
+ }
+
+ private void startFunctionWorkersWithProcessContainerFactory(String
suffix, int numFunctionWorkers) {
String serviceUrl = "pulsar://pulsar-broker-0:" +
PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" +
PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers(
- "functions-worker",
+ "functions-worker-process-" + suffix,
numFunctionWorkers,
(name) -> new WorkerContainer(clusterName, name)
.withNetwork(network)
@@ -263,11 +268,11 @@ private void
startFunctionWorkersWithProcessContainerFactory(int numFunctionWork
this.startWorkers();
}
- private void startFunctionWorkersWithThreadContainerFactory(int
numFunctionWorkers) {
+ private void startFunctionWorkersWithThreadContainerFactory(String suffix,
int numFunctionWorkers) {
String serviceUrl = "pulsar://pulsar-broker-0:" +
PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" +
PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers(
- "functions-worker",
+ "functions-worker-thread-" + suffix,
numFunctionWorkers,
(name) -> new WorkerContainer(clusterName, name)
.withNetwork(network)
@@ -289,17 +294,38 @@ private void
startFunctionWorkersWithThreadContainerFactory(int numFunctionWorke
this.startWorkers();
}
- private void startWorkers() {
+ public synchronized void startWorkers() {
// Start workers that have been initialized
workerContainers.values().parallelStream().forEach(WorkerContainer::start);
log.info("Successfully started {} worker conntainers.",
workerContainers.size());
}
+ public synchronized void stopWorkers() {
+ // Stop workers that have been initialized
+
workerContainers.values().parallelStream().forEach(WorkerContainer::stop);
+ workerContainers.clear();
+ }
+
+ public void startContainers(Map<String, GenericContainer<?>> containers) {
+ containers.forEach((name, container) -> {
+ container
+ .withNetwork(network)
+ .withNetworkAliases(name)
+ .start();
+ log.info("Successfully start container {}.", name);
+ });
+ }
+
+ public void stopContainers(Map<String, GenericContainer<?>> containers) {
+ containers.values().parallelStream().forEach(GenericContainer::stop);
+ log.info("Successfully stop containers : {}", containers);
+ }
+
public BrokerContainer getAnyBroker() {
return getAnyContainer(brokerContainers, "broker");
}
- public WorkerContainer getAnyWorker() {
+ public synchronized WorkerContainer getAnyWorker() {
return getAnyContainer(workerContainers, "functions-worker");
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
index e2e89dc762..6181192db0 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
@@ -19,8 +19,6 @@
package org.apache.pulsar.tests.integration.topologies;
import lombok.extern.slf4j.Slf4j;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import java.util.concurrent.ThreadLocalRandom;
@@ -70,7 +68,6 @@
protected static PulsarCluster pulsarCluster;
- @BeforeClass
public void setupCluster() throws Exception {
this.setupCluster("");
}
@@ -92,17 +89,23 @@ public void setupCluster(String namePrefix) throws
Exception {
return specBuilder;
}
+ protected void beforeStartCluster() throws Exception {
+ // no-op
+ }
+
protected void setupCluster(PulsarClusterSpec spec) throws Exception {
log.info("Setting up cluster {} with {} bookies, {} brokers",
spec.clusterName(), spec.numBookies(), spec.numBrokers());
pulsarCluster = PulsarCluster.forSpec(spec);
+
+ beforeStartCluster();
+
pulsarCluster.start();
log.info("Cluster {} is setup", spec.clusterName());
}
- @AfterClass
public void tearDownCluster() {
if (null != pulsarCluster) {
pulsarCluster.stop();
diff --git a/tests/integration/src/test/resources/pulsar-process.xml
b/tests/integration/src/test/resources/pulsar-process.xml
new file mode 100644
index 0000000000..482e570a67
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-process.xml
@@ -0,0 +1,30 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
+<suite name="Pulsar (Process Function Worker) Integration Tests" verbose="2"
annotations="JDK">
+ <test name="pulsar-test-suite" preserve-order="true" >
+ <classes>
+ <class name="org.apache.pulsar.tests.integration.cli.CLITest" />
+ <class
name="org.apache.pulsar.tests.integration.compaction.TestCompaction" />
+ <class
name="org.apache.pulsar.tests.integration.functions.PulsarFunctionsProcessTest"
/>
+ </classes>
+ </test>
+</suite>
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar-thread.xml
b/tests/integration/src/test/resources/pulsar-thread.xml
new file mode 100644
index 0000000000..14a908f1ea
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-thread.xml
@@ -0,0 +1,28 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
+<suite name="Pulsar (Thread Function Worker) Integration Tests" verbose="2"
annotations="JDK">
+ <test name="pulsar-thread-test-suite" preserve-order="true">
+ <classes>
+ <class
name="org.apache.pulsar.tests.integration.functions.PulsarFunctionsThreadTest"
/>
+ </classes>
+ </test>
+</suite>
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar.xml
b/tests/integration/src/test/resources/pulsar.xml
new file mode 100644
index 0000000000..ac21f4cb18
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -0,0 +1,30 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
+<!-- TODO: we have to put suite files in one file to avoid executing TESTNG
test suites multiple times.
+ see {@link https://github.com/cbeust/testng/issues/508} -->
+<suite name="Pulsar Test Suite" parallel="instances" thread-count="1">
+ <suite-files>
+ <suite-file path="./pulsar-process.xml" />
+ <suite-file path="./pulsar-thread.xml" />
+ <suite-file path="./tiered-storage.xml" />
+ </suite-files>
+</suite>
diff --git a/tests/integration/src/test/resources/tiered-storage.xml
b/tests/integration/src/test/resources/tiered-storage.xml
new file mode 100644
index 0000000000..8cbdaa712b
--- /dev/null
+++ b/tests/integration/src/test/resources/tiered-storage.xml
@@ -0,0 +1,28 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
+<suite name="Pulsar (Tiered Storage) Integration Tests" verbose="2"
annotations="JDK">
+ <test name="tiered-storage-test-suite" preserve-order="true">
+ <classes>
+ <class
name="org.apache.pulsar.tests.integration.offload.TestS3Offload" />
+ </classes>
+ </test>
+</suite>
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services