This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d248cee  [Integration Tests] Improve and fix integration tests (#2356)
d248cee is described below

commit d248cee810c311c201035c6b2ae4d255ea3a54b4
Author: Sijie Guo <[email protected]>
AuthorDate: Fri Aug 10 15:46:00 2018 -0700

    [Integration Tests] Improve and fix integration tests (#2356)
    
    * [Integration Tests] Improve and fix integration tests
    
     ### Motivation
    
    Integration test are basically just not working in apache CI. there are 
multiple reasons:
    
    1. PULSAR_MEM is set to very high (~2G). If we start more than 6 
containers, it will quickly go up to ~12GB.
    2. We start containers for each tests, which is very inefficient.
    3. There is a regression from #2214. #2214 tried to listen on a hostname 
that is configured in worker config,
      which the hostname is internally to docker environment.
    
     ### Changes
    
    1. Set PULSAR_MEM to use no more than 128M.
    2. Switch to use test suite, trying to start containers only once as 
possible and use them across test suites.
       so we only start the cluster for a set of tests. hence reorganize those 
tests and get rid
       of using dataProvider, which doesn't work well with current approach.
    3. Remove binding to hostname in WorkerServer, so it binds to all 
interfaces.
    
    * remove suite xml
    
    * Avoid running testng suites multiple times
---
 conf/pulsar_env.sh                                 |   2 +-
 conf/pulsar_tools_env.sh                           |   2 +-
 .../pulsar/functions/worker/rest/WorkerServer.java |   3 -
 .../latest-version-image/conf/bookie.conf          |   4 +-
 .../latest-version-image/conf/broker.conf          |   2 +-
 .../latest-version-image/conf/global-zk.conf       |   2 +-
 .../latest-version-image/conf/local-zk.conf        |   2 +-
 .../latest-version-image/conf/proxy.conf           |   2 +-
 tests/integration/pom.xml                          |   4 +
 .../pulsar/tests/integration/cli/CLITest.java      |  11 +-
 .../integration/compaction/TestCompaction.java     | 118 ++--
 .../integration/containers/PulsarContainer.java    |  21 +-
 .../integration/containers/WorkerContainer.java    |   8 +-
 ...sarFunctionsTest.java => FunctionsCLITest.java} |   3 +-
 .../PulsarFunctionsProcessTest.java}               |  16 +-
 .../integration/functions/PulsarFunctionsTest.java | 612 +++++++++++++++++++--
 .../functions/PulsarFunctionsTestBase.java         |  27 +-
 .../PulsarFunctionsThreadTest.java}                |  16 +-
 .../runtime/PulsarFunctionsRuntimeTest.java        | 194 -------
 .../tests/integration/io/CassandraSinkTester.java  |  19 +-
 .../tests/integration/io/KafkaSinkTester.java      |  21 +-
 .../tests/integration/io/KafkaSourceTester.java    |  21 +-
 .../tests/integration/io/PulsarIOSinkTest.java     | 274 ---------
 .../tests/integration/io/PulsarIOSourceTest.java   | 270 ---------
 .../pulsar/tests/integration/io/SinkTester.java    |  14 +-
 .../pulsar/tests/integration/io/SourceTester.java  |  12 +-
 .../tests/integration/offload/TestS3Offload.java   |  67 +--
 .../pulsar/tests/integration/smoke/SmokeTest.java  |  86 ---
 .../tests/integration/suites/PulsarTestSuite.java  |  76 +++
 .../suites/PulsarTieredStorageTestSuite.java       |  80 +++
 .../integration/topologies/PulsarCluster.java      |  72 ++-
 .../topologies/PulsarClusterTestBase.java          |  11 +-
 .../src/test/resources/pulsar-process.xml          |  30 +
 .../src/test/resources/pulsar-thread.xml           |  28 +
 tests/integration/src/test/resources/pulsar.xml    |  30 +
 .../src/test/resources/tiered-storage.xml          |  28 +
 36 files changed, 1120 insertions(+), 1068 deletions(-)

diff --git a/conf/pulsar_env.sh b/conf/pulsar_env.sh
index f600e6d..f21d1dd 100644
--- a/conf/pulsar_env.sh
+++ b/conf/pulsar_env.sh
@@ -42,7 +42,7 @@
 # PULSAR_GLOBAL_ZK_CONF=
 
 # Extra options to be passed to the jvm
-PULSAR_MEM=" -Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"
+PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"}
 
 # Garbage collection options
 PULSAR_GC=" -XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled 
-XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis 
-XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 
-XX:+DisableExplicitGC -XX:-ResizePLAB"
diff --git a/conf/pulsar_tools_env.sh b/conf/pulsar_tools_env.sh
index 7ad022f..38e2859 100755
--- a/conf/pulsar_tools_env.sh
+++ b/conf/pulsar_tools_env.sh
@@ -42,7 +42,7 @@
 # PULSAR_GLOBAL_ZK_CONF=
 
 # Extra options to be passed to the jvm
-PULSAR_MEM=${PULSAR_MEM:-"-Xmx256m -XX:MaxDirectMemorySize=256m"}
+PULSAR_MEM=${PULSAR_TOOLS_MEM:-"-Xmx128m -XX:MaxDirectMemorySize=128m"}
 
 # Garbage collection options
 PULSAR_GC=" -client "
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 79b9093..3e9a426 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -26,7 +26,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -90,7 +89,6 @@ public class WorkerServer {
         List<ServerConnector> connectors = new ArrayList<>();
         ServerConnector connector = new ServerConnector(server, 1, 1);
         connector.setPort(this.workerConfig.getWorkerPort());
-        connector.setHost(this.workerConfig.getWorkerHostname());
         connectors.add(connector);
 
         List<Handler> handlers = new ArrayList<>(3);
@@ -114,7 +112,6 @@ public class WorkerServer {
                         
this.workerConfig.isTlsRequireTrustedClientCertOnConnect());
                 ServerConnector tlsConnector = new ServerConnector(server, 1, 
1, sslCtxFactory);
                 tlsConnector.setPort(this.workerConfig.getWorkerPortTls());
-                tlsConnector.setHost(this.workerConfig.getWorkerHostname());
                 connectors.add(tlsConnector);
             } catch (GeneralSecurityException e) {
                 throw new RuntimeException(e);
diff --git a/tests/docker-images/latest-version-image/conf/bookie.conf 
b/tests/docker-images/latest-version-image/conf/bookie.conf
index 030d6ad..b9dc405 100644
--- a/tests/docker-images/latest-version-image/conf/bookie.conf
+++ b/tests/docker-images/latest-version-image/conf/bookie.conf
@@ -22,5 +22,7 @@ autostart=false
 redirect_stderr=true
 stdout_logfile=/var/log/pulsar/bookie.log
 directory=/pulsar
-environment=PULSAR_MEM=-Xms128M
+environment=PULSAR_MEM=-Xmx128M
+environment=dbStorage_writeCacheMaxSizeMb=16
+environment=dbStorage_readAheadCacheMaxSizeMb=16
 command=/pulsar/bin/pulsar bookie
diff --git a/tests/docker-images/latest-version-image/conf/broker.conf 
b/tests/docker-images/latest-version-image/conf/broker.conf
index 5492abf..e5b0a03 100644
--- a/tests/docker-images/latest-version-image/conf/broker.conf
+++ b/tests/docker-images/latest-version-image/conf/broker.conf
@@ -22,6 +22,6 @@ autostart=false
 redirect_stderr=true
 stdout_logfile=/var/log/pulsar/broker.log
 directory=/pulsar
-environment=PULSAR_MEM=-Xms128M
+environment=PULSAR_MEM=-Xmx128M
 command=/pulsar/bin/pulsar broker
 
diff --git a/tests/docker-images/latest-version-image/conf/global-zk.conf 
b/tests/docker-images/latest-version-image/conf/global-zk.conf
index 5c6edaa..d5556c7 100644
--- a/tests/docker-images/latest-version-image/conf/global-zk.conf
+++ b/tests/docker-images/latest-version-image/conf/global-zk.conf
@@ -22,6 +22,6 @@ autostart=false
 redirect_stderr=true
 stdout_logfile=/var/log/pulsar/global-zk.log
 directory=/pulsar
-environment=PULSAR_MEM=-Xms128M
+environment=PULSAR_MEM=-Xmx128M
 command=/pulsar/bin/pulsar configuration-store
 
diff --git a/tests/docker-images/latest-version-image/conf/local-zk.conf 
b/tests/docker-images/latest-version-image/conf/local-zk.conf
index 2822cb1..c64dee3 100644
--- a/tests/docker-images/latest-version-image/conf/local-zk.conf
+++ b/tests/docker-images/latest-version-image/conf/local-zk.conf
@@ -22,6 +22,6 @@ autostart=false
 redirect_stderr=true
 stdout_logfile=/var/log/pulsar/local-zk.log
 directory=/pulsar
-environment=PULSAR_MEM=-Xms128M
+environment=PULSAR_MEM=-Xmx128M
 command=/pulsar/bin/pulsar zookeeper
 
diff --git a/tests/docker-images/latest-version-image/conf/proxy.conf 
b/tests/docker-images/latest-version-image/conf/proxy.conf
index b812750..dc54dd9 100644
--- a/tests/docker-images/latest-version-image/conf/proxy.conf
+++ b/tests/docker-images/latest-version-image/conf/proxy.conf
@@ -22,6 +22,6 @@ autostart=false
 redirect_stderr=true
 stdout_logfile=/var/log/pulsar/proxy.log
 directory=/pulsar
-environment=PULSAR_MEM=-Xms128M
+environment=PULSAR_MEM=-Xmx128M
 command=/pulsar/bin/pulsar proxy
 
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index c1e61f6..a3a4694 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -131,6 +131,10 @@
               -Dio.netty.leakDetectionLevel=advanced
               </argLine>
               <skipTests>false</skipTests>
+              <suiteXmlFiles>
+                <file>src/test/resources/pulsar.xml</file>
+              </suiteXmlFiles>
+              <forkCount>1</forkCount>
             </configuration>
           </plugin>
         </plugins>
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index acdae26..e30a1e3 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -18,23 +18,21 @@
  */
 package org.apache.pulsar.tests.integration.cli;
 
-import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
 import org.testng.annotations.Test;
 
 /**
  * Test Pulsar CLI.
  */
-public class CLITest extends PulsarClusterTestBase {
+public class CLITest extends PulsarTestSuite {
 
     @Test
     public void testDeprecatedCommands() throws Exception {
@@ -68,7 +66,7 @@ public class CLITest extends PulsarClusterTestBase {
         for (BrokerContainer container : pulsarCluster.getBrokers()) {
             ContainerExecResult result = container.execCmd(
                 PulsarCluster.ADMIN_SCRIPT,
-                "persistent",
+                "topics",
                 "create-subscription",
                 "persistent://public/default/" + topic,
                 "--subscription",
@@ -98,7 +96,7 @@ public class CLITest extends PulsarClusterTestBase {
         // terminate the topic
         result = container.execCmd(
             PulsarCluster.ADMIN_SCRIPT,
-            "persistent",
+            "topics",
             "terminate",
             topicName);
         assertTrue(result.getStdout().contains("Topic succesfully terminated 
at"));
@@ -209,4 +207,5 @@ public class CLITest extends PulsarClusterTestBase {
             result.getStdout().contains("\"retentionSizeInMB\" : -1"),
             result.getStdout());
     }
+
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
index 05d6235..fb76202 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
@@ -18,18 +18,21 @@
  */
 package org.apache.pulsar.tests.integration.compaction;
 
+import static org.testng.Assert.assertEquals;
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-
-public class TestCompaction extends PulsarClusterTestBase {
+/**
+ * Test cases for compaction.
+ */
+public class TestCompaction extends PulsarTestSuite {
 
     @Test(dataProvider = "ServiceUrls")
     public void testPublishCompactAndConsumeCLI(String serviceUrl) throws 
Exception {
@@ -45,29 +48,42 @@ public class TestCompaction extends PulsarClusterTestBase {
         try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build()) {
             
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
 
-            try(Producer<byte[]> producer = 
client.newProducer().topic(topic).create()) {
-                
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
-                
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+            try(Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic).create()) {
+                producer.newMessage()
+                    .key("key0")
+                    .value("content0")
+                    .send();
+                producer.newMessage()
+                    .key("key0")
+                    .value("content1")
+                    .send();
             }
 
-            try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
-                    .readCompacted(true).subscriptionName("sub1").subscribe()) 
{
-                Message<byte[]> m = consumer.receive();
+            try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .readCompacted(true)
+                .subscriptionName("sub1")
+                .subscribe()) {
+                Message<String> m = consumer.receive();
                 assertEquals(m.getKey(), "key0");
-                assertEquals(m.getData(), "content0".getBytes());
+                assertEquals(m.getValue(), "content0");
 
                 m = consumer.receive();
                 assertEquals(m.getKey(), "key0");
-                assertEquals(m.getData(), "content1".getBytes());
+                assertEquals(m.getValue(), "content1");
             }
 
             pulsarCluster.runPulsarBaseCommandOnAnyBroker("compact-topic", 
"-t", topic);
 
-            try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
-                    .readCompacted(true).subscriptionName("sub1").subscribe()) 
{
-                Message<byte[]> m = consumer.receive();
+            try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .readCompacted(true)
+                .subscriptionName("sub1")
+                .subscribe()) {
+                Message<String> m = consumer.receive();
                 assertEquals(m.getKey(), "key0");
-                assertEquals(m.getData(), "content1".getBytes());
+                assertEquals(m.getValue(), "content1");
             }
         }
     }
@@ -89,32 +105,38 @@ public class TestCompaction extends PulsarClusterTestBase {
         try (PulsarClient client = PulsarClient.create(serviceUrl)) {
             
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
 
-            try(Producer<byte[]> producer = 
client.newProducer().topic(topic).create()) {
-                
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
-                
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+            try(Producer<String> producer = 
client.newProducer(Schema.STRING).topic(topic).create()) {
+                producer.newMessage()
+                    .key("key0")
+                    .value("content0")
+                    .send();
+                producer.newMessage()
+                    .key("key0")
+                    .value("content1")
+                    .send();
             }
 
-            try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+            try (Consumer<String> consumer = 
client.newConsumer(Schema.STRING).topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) 
{
-                Message<byte[]> m = consumer.receive();
+                Message<String> m = consumer.receive();
                 assertEquals(m.getKey(), "key0");
-                assertEquals(m.getData(), "content0".getBytes());
+                assertEquals(m.getValue(), "content0");
 
                 m = consumer.receive();
                 assertEquals(m.getKey(), "key0");
-                assertEquals(m.getData(), "content1".getBytes());
+                assertEquals(m.getValue(), "content1");
             }
-            pulsarCluster.runAdminCommandOnAnyBroker("persistent",
+            pulsarCluster.runAdminCommandOnAnyBroker("topics",
                     "compact", topic);
 
-            pulsarCluster.runAdminCommandOnAnyBroker("persistent",
+            pulsarCluster.runAdminCommandOnAnyBroker("topics",
                 "compaction-status", "-w", topic);
 
-            try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+            try (Consumer<String> consumer = 
client.newConsumer(Schema.STRING).topic(topic)
                     .readCompacted(true).subscriptionName("sub1").subscribe()) 
{
-                Message<byte[]> m = consumer.receive();
+                Message<String> m = consumer.receive();
                 assertEquals(m.getKey(), "key0");
-                assertEquals(m.getData(), "content1".getBytes());
+                assertEquals(m.getValue(), "content1");
             }
         }
     }
@@ -122,21 +144,21 @@ public class TestCompaction extends PulsarClusterTestBase 
{
     private static void waitAndVerifyCompacted(PulsarClient client, String 
topic,
                                                String sub, String expectedKey, 
String expectedValue) throws Exception {
         for (int i = 0; i < 60; i++) {
-            try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+            try (Consumer<String> consumer = 
client.newConsumer(Schema.STRING).topic(topic)
                  .readCompacted(true).subscriptionName(sub).subscribe()) {
-                Message<byte[]> m = consumer.receive();
+                Message<String> m = consumer.receive();
                 assertEquals(m.getKey(), expectedKey);
-                if (new String(m.getData()).equals(expectedValue)) {
+                if (m.getValue() == expectedValue) {
                     break;
                 }
             }
             Thread.sleep(1000);
         }
-        try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+        try (Consumer<String> consumer = 
client.newConsumer(Schema.STRING).topic(topic)
                 .readCompacted(true).subscriptionName(sub).subscribe()) {
-            Message<byte[]> m = consumer.receive();
+            Message<String> m = consumer.receive();
             assertEquals(m.getKey(), expectedKey);
-            assertEquals(new String(m.getData()), expectedValue);
+            assertEquals(m.getValue(), expectedValue);
         }
     }
 
@@ -154,18 +176,27 @@ public class TestCompaction extends PulsarClusterTestBase 
{
         pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
                 "set-compaction-threshold", "--threshold", "1", namespace);
 
-        try (PulsarClient client = PulsarClient.create(serviceUrl)) {
-            
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
-
-            try(Producer<byte[]> producer = 
client.newProducer().topic(topic).create()) {
-                
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
-                
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build()) {
+            
client.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub1").subscribe().close();
+
+            try(Producer<String> producer = 
client.newProducer(Schema.STRING).topic(topic).create()) {
+                producer.newMessage()
+                    .key("key0")
+                    .value("content0")
+                    .send();
+                producer.newMessage()
+                    .key("key0")
+                    .value("content1")
+                    .send();
             }
 
             waitAndVerifyCompacted(client, topic, "sub1", "key0", "content1");
 
-            try(Producer<byte[]> producer = 
client.newProducer().topic(topic).create()) {
-                
producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build());
+            try(Producer<String> producer = 
client.newProducer(Schema.STRING).topic(topic).create()) {
+                producer.newMessage()
+                    .key("key0")
+                    .value("content2")
+                    .send();
             }
             waitAndVerifyCompacted(client, topic, "sub1", "key0", "content2");
         }
@@ -191,4 +222,5 @@ public class TestCompaction extends PulsarClusterTestBase {
         return result;
     }
 
+
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 2971c59..ad77b62 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -25,6 +25,7 @@ import java.util.Objects;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.utils.DockerUtils;
 import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
 
 /**
  * Abstract Test Container for Pulsar.
@@ -46,6 +47,7 @@ public abstract class PulsarContainer<SelfT extends 
PulsarContainer<SelfT>> exte
     private final String serviceEntryPoint;
     private final int servicePort;
     private final int httpPort;
+    private final String httpPath;
 
     public PulsarContainer(String clusterName,
                            String hostname,
@@ -53,12 +55,23 @@ public abstract class PulsarContainer<SelfT extends 
PulsarContainer<SelfT>> exte
                            String serviceEntryPoint,
                            int servicePort,
                            int httpPort) {
+        this(clusterName, hostname, serviceName, serviceEntryPoint, 
servicePort, httpPort, "/metrics");
+    }
+
+    public PulsarContainer(String clusterName,
+                           String hostname,
+                           String serviceName,
+                           String serviceEntryPoint,
+                           int servicePort,
+                           int httpPort,
+                           String httpPath) {
         super(clusterName, IMAGE_NAME);
         this.hostname = hostname;
         this.serviceName = serviceName;
         this.serviceEntryPoint = serviceEntryPoint;
         this.servicePort = servicePort;
         this.httpPort = httpPort;
+        this.httpPath = httpPath;
     }
 
     @Override
@@ -90,7 +103,13 @@ public abstract class PulsarContainer<SelfT extends 
PulsarContainer<SelfT>> exte
 
     @Override
     public void start() {
-        if (httpPort > 0 || servicePort > 0) {
+        if (httpPort > 0 && servicePort < 0) {
+            this.waitStrategy = new HttpWaitStrategy()
+                .forPort(httpPort)
+                .forStatusCode(200)
+                .forPath(httpPath)
+                .withStartupTimeout(Duration.of(300, SECONDS));
+        } else if (httpPort > 0 || servicePort > 0) {
             this.waitStrategy = new HostPortWaitStrategy()
                 .withStartupTimeout(Duration.of(300, SECONDS));
         }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
index 03e2053..dcc0999 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
@@ -27,6 +27,12 @@ public class WorkerContainer extends 
PulsarContainer<WorkerContainer> {
 
     public WorkerContainer(String clusterName, String hostname) {
         super(
-            clusterName, hostname, hostname, "bin/run-functions-worker.sh", 
-1, BROKER_HTTP_PORT);
+            clusterName,
+            hostname,
+            hostname,
+            "bin/run-functions-worker.sh",
+            -1,
+            BROKER_HTTP_PORT,
+            "/admin/v2/functions/cluster");
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/FunctionsCLITest.java
similarity index 96%
copy from 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
copy to 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/FunctionsCLITest.java
index a541c09..4989f4f 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/FunctionsCLITest.java
@@ -25,11 +25,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.WorkerContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import 
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testng.annotations.Test;
 
 @Slf4j
-public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
+public class FunctionsCLITest extends PulsarTestSuite {
 
     //
     // Tests on uploading/downloading function packages.
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsProcessTest.java
similarity index 66%
copy from 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
copy to 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsProcessTest.java
index 03e2053..ec36d52 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsProcessTest.java
@@ -16,17 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.tests.integration.containers;
+package org.apache.pulsar.tests.integration.functions;
+
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 
 /**
- * A pulsar container that runs functions worker.
+ * Process based test.
  */
-public class WorkerContainer extends PulsarContainer<WorkerContainer> {
-
-    public static final String NAME = "pulsar-worker";
-
-    public WorkerContainer(String clusterName, String hostname) {
-        super(
-            clusterName, hostname, hostname, "bin/run-functions-worker.sh", 
-1, BROKER_HTTP_PORT);
+public class PulsarFunctionsProcessTest extends PulsarFunctionsTest {
+    public PulsarFunctionsProcessTest() {
+        super(FunctionRuntimeType.PROCESS);
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index a541c09..57a6ba4 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -20,73 +20,607 @@ package org.apache.pulsar.tests.integration.functions;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
+import com.google.common.base.Stopwatch;
+import com.google.gson.Gson;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.tests.integration.containers.WorkerContainer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import 
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
+import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
+import org.apache.pulsar.tests.integration.io.CassandraSinkTester;
+import org.apache.pulsar.tests.integration.io.KafkaSinkTester;
+import org.apache.pulsar.tests.integration.io.KafkaSourceTester;
+import org.apache.pulsar.tests.integration.io.SinkTester;
+import org.apache.pulsar.tests.integration.io.SourceTester;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testng.annotations.Test;
 
+/**
+ * A test base for testing sink.
+ */
 @Slf4j
-public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
+public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
+
+    PulsarFunctionsTest(FunctionRuntimeType functionRuntimeType) {
+        super(functionRuntimeType);
+    }
+
+    @Test
+    public void testKafkaSink() throws Exception {
+        testSink(new KafkaSinkTester());
+    }
+
+    @Test
+    public void testCassandraSink() throws Exception {
+        testSink(new CassandraSinkTester());
+    }
+
+    private void testSink(SinkTester tester) throws Exception {
+        tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
+
+        final String tenant = TopicName.PUBLIC_TENANT;
+        final String namespace = TopicName.DEFAULT_NAMESPACE;
+        final String inputTopicName = "test-sink-connector-"
+            + functionRuntimeType + "-input-topic-" + randomName(8);
+        final String sinkName = "test-sink-connector-"
+            + functionRuntimeType + "-name-" + randomName(8);
+        final int numMessages = 20;
+
+        // prepare the testing environment for sink
+        prepareSink(tester);
+
+        // submit the sink connector
+        submitSinkConnector(tester, tenant, namespace, sinkName, 
inputTopicName);
+
+        // get sink info
+        getSinkInfoSuccess(tester, tenant, namespace, sinkName);
+
+        // get sink status
+        getSinkStatus(tenant, namespace, sinkName);
+
+        // produce messages
+        Map<String, String> kvs = produceMessagesToInputTopic(inputTopicName, 
numMessages);
+
+        // wait for sink to process messages
+        waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
+
+        // validate the sink result
+        tester.validateSinkResult(kvs);
+
+        // delete the sink
+        deleteSink(tenant, namespace, sinkName);
+
+        // get sink info (sink should be deleted)
+        getSinkInfoNotFound(tenant, namespace, sinkName);
+    }
+
+    protected void prepareSink(SinkTester tester) throws Exception {
+        tester.prepareSink();
+    }
+
+    protected void submitSinkConnector(SinkTester tester,
+                                       String tenant,
+                                       String namespace,
+                                       String sinkName,
+                                       String inputTopicName) throws Exception 
{
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "sink", "create",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName,
+            "--sink-type", tester.sinkType(),
+            "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+            "--inputs", inputTopicName
+        };
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(
+            result.getStdout().contains("\"Created successfully\""),
+            result.getStdout());
+    }
+
+    protected void getSinkInfoSuccess(SinkTester tester,
+                                      String tenant,
+                                      String namespace,
+                                      String sinkName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName
+        };
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        log.info("Get sink info : {}", result.getStdout());
+        assertTrue(
+            result.getStdout().contains("\"builtin\": \"" + 
tester.getSinkType() + "\""),
+            result.getStdout()
+        );
+    }
+
+    protected void getSinkStatus(String tenant, String namespace, String 
sinkName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "getstatus",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName
+        };
+        while (true) {
+            try {
+                ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+                log.info("Get sink status : {}", result.getStdout());
+                if (result.getStdout().contains("\"running\": true")) {
+                    return;
+                }
+            } catch (ContainerExecException e) {
+                // expected in early iterations
+            }
+            log.info("Backoff 1 second until the function is running");
+            TimeUnit.SECONDS.sleep(1);
+        }
+    }
+
+    protected Map<String, String> produceMessagesToInputTopic(String 
inputTopicName,
+                                                              int numMessages) 
throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic(inputTopicName)
+            .create();
+        LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+        for (int i = 0; i < numMessages; i++) {
+            String key = "key-" + i;
+            String value = "value-" + i;
+            kvs.put(key, value);
+            producer.newMessage()
+                .key(key)
+                .value(value)
+                .send();
+        }
+        return kvs;
+    }
+
+    protected void waitForProcessingMessages(String tenant,
+                                             String namespace,
+                                             String sinkName,
+                                             int numMessages) throws Exception 
{
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "getstatus",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName
+        };
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        while (true) {
+            try {
+                ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+                log.info("Get sink status : {}", result.getStdout());
+                if (result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
+                    return;
+                }
+            } catch (ContainerExecException e) {
+                // expected in early iterations
+            }
+
+            log.info("{} ms has elapsed but the sink hasn't process {} 
messages, backoff to wait for another 1 second",
+                stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+            TimeUnit.SECONDS.sleep(1);
+        }
+    }
+
+    protected void deleteSink(String tenant, String namespace, String 
sinkName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "sink",
+            "delete",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName
+        };
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(
+            result.getStdout().contains("Deleted successfully"),
+            result.getStdout()
+        );
+        assertTrue(
+            result.getStderr().isEmpty(),
+            result.getStderr()
+        );
+    }
+
+    protected void getSinkInfoNotFound(String tenant, String namespace, String 
sinkName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName
+        };
+        try {
+            pulsarCluster.getAnyWorker().execCmd(commands);
+            fail("Command should have exited with non-zero");
+        } catch (ContainerExecException e) {
+            assertTrue(e.getResult().getStderr().contains("Reason: Function " 
+ sinkName + " doesn't exist"));
+        }
+    }
 
     //
-    // Tests on uploading/downloading function packages.
+    // Source Test
     //
 
-    public String checkUpload() throws Exception {
-        String bkPkgPath = String.format("%s/%s/%s",
-            "tenant-" + randomName(8),
-            "ns-" + randomName(8),
-            "fn-" + randomName(8));
+    @Test
+    public void testKafkaSource() throws Exception {
+        testSource(new KafkaSourceTester());
+    }
+
+    private void testSource(SourceTester tester)  throws Exception {
+        tester.findSourceServiceContainer(pulsarCluster.getExternalServices());
+
+        final String tenant = TopicName.PUBLIC_TENANT;
+        final String namespace = TopicName.DEFAULT_NAMESPACE;
+        final String outputTopicName = "test-source-connector-"
+            + functionRuntimeType + "-output-topic-" + randomName(8);
+        final String sourceName = "test-source-connector-"
+            + functionRuntimeType + "-name-" + randomName(8);
+        final int numMessages = 20;
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(outputTopicName)
+            .subscriptionName("source-tester")
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscribe();
+
+        // prepare the testing environment for source
+        prepareSource(tester);
 
-        UploadDownloadCommandGenerator generator = 
UploadDownloadCommandGenerator.createUploader(
+        // submit the source connector
+        submitSourceConnector(tester, tenant, namespace, sourceName, 
outputTopicName);
+
+        // get source info
+        getSourceInfoSuccess(tester, tenant, namespace, sourceName);
+
+        // get source status
+        getSourceStatus(tenant, namespace, sourceName);
+
+        // produce messages
+        Map<String, String> kvs = tester.produceSourceMessages(numMessages);
+
+        // wait for source to process messages
+        waitForProcessingSourceMessages(tenant, namespace, sourceName, 
numMessages);
+
+        // validate the source result
+        validateSourceResult(consumer, kvs);
+
+        // delete the source
+        deleteSource(tenant, namespace, sourceName);
+
+        // get source info (source should be deleted)
+        getSourceInfoNotFound(tenant, namespace, sourceName);
+    }
+
+    protected void prepareSource(SourceTester tester) throws Exception {
+        tester.prepareSource();
+    }
+
+    protected void submitSourceConnector(SourceTester tester,
+                                         String tenant,
+                                         String namespace,
+                                         String sourceName,
+                                         String outputTopicName) throws 
Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "source", "create",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sourceName,
+            "--source-type", tester.sourceType(),
+            "--sourceConfig", new Gson().toJson(tester.sourceConfig()),
+            "--destinationTopicName", outputTopicName
+        };
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(
+            result.getStdout().contains("\"Created successfully\""),
+            result.getStdout());
+    }
+
+    protected void getSourceInfoSuccess(SourceTester tester,
+                                        String tenant,
+                                        String namespace,
+                                        String sourceName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sourceName
+        };
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        log.info("Get source info : {}", result.getStdout());
+        assertTrue(
+            result.getStdout().contains("\"builtin\": \"" + 
tester.getSourceType() + "\""),
+            result.getStdout()
+        );
+    }
+
+    protected void getSourceStatus(String tenant, String namespace, String 
sourceName) throws Exception {
+        String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            bkPkgPath);
-        String actualCommand = generator.generateCommand();
+            "functions",
+            "getstatus",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sourceName
+        };
+        while (true) {
+            try {
+                ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+                log.info("Get source status : {}", result.getStdout());
+                if (result.getStdout().contains("\"running\": true")) {
+                    return;
+                }
+            } catch (ContainerExecException e) {
+                // expected for early iterations
+            }
+            log.info("Backoff 1 second until the function is running");
+            TimeUnit.SECONDS.sleep(1);
+        }
+    }
 
-        log.info(actualCommand);
+    protected void validateSourceResult(Consumer<String> consumer,
+                                        Map<String, String> kvs) throws 
Exception {
+        for (Map.Entry<String, String> kv : kvs.entrySet()) {
+            Message<String> msg = consumer.receive();
+            assertEquals(kv.getKey(), msg.getKey());
+            assertEquals(kv.getValue(), msg.getValue());
+        }
+    }
+
+    protected void waitForProcessingSourceMessages(String tenant,
+                                                   String namespace,
+                                                   String sourceName,
+                                                   int numMessages) throws 
Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "getstatus",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sourceName
+        };
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        while (true) {
+            try {
+                ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+                log.info("Get source status : {}", result.getStdout());
+                if (result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
+                    return;
+                }
+            } catch (ContainerExecException e) {
+                // expected for early iterations
+            }
+            log.info("{} ms has elapsed but the source hasn't process {} 
messages, backoff to wait for another 1 second",
+                stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+            TimeUnit.SECONDS.sleep(1);
+        }
+    }
 
+    protected void deleteSource(String tenant, String namespace, String 
sourceName) throws Exception {
         String[] commands = {
-            "sh", "-c", actualCommand
+            PulsarCluster.ADMIN_SCRIPT,
+            "source",
+            "delete",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sourceName
         };
-        ContainerExecResult output = 
pulsarCluster.getAnyWorker().execCmd(commands);
-        assertEquals(0, output.getExitCode());
-        assertTrue(output.getStdout().contains("\"Uploaded successfully\""));
-        return bkPkgPath;
+        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(
+            result.getStdout().contains("Delete source successfully"),
+            result.getStdout()
+        );
+        assertTrue(
+            result.getStderr().isEmpty(),
+            result.getStderr()
+        );
+    }
+
+    protected void getSourceInfoNotFound(String tenant, String namespace, 
String sourceName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sourceName
+        };
+        try {
+            pulsarCluster.getAnyWorker().execCmd(commands);
+            fail("Command should have exited with non-zero");
+        } catch (ContainerExecException e) {
+            assertTrue(e.getResult().getStderr().contains("Reason: Function " 
+ sourceName + " doesn't exist"));
+        }
+    }
+
+    //
+    // Test CRUD functions on different runtimes.
+    //
+
+    @Test
+    public void testPythonExclamationFunction() throws Exception {
+        testExclamationFunction(Runtime.PYTHON);
     }
 
     @Test
-    public void checkDownload() throws Exception {
-        String bkPkgPath = checkUpload();
-        String localPkgFile = "/tmp/checkdownload-" + randomName(16);
+    public void testJavaExclamationFunction() throws Exception {
+        testExclamationFunction(Runtime.JAVA);
+    }
+
+    private void testExclamationFunction(Runtime runtime) throws Exception {
+        if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == 
Runtime.PYTHON) {
+            // python can only run on process mode
+            return;
+        }
+
+        String inputTopicName = "test-exclamation-" + runtime + "-input-" + 
randomName(8);
+        String outputTopicName = "test-exclamation-" + runtime + "-output-" + 
randomName(8);
+        String functionName = "test-exclamation-fn-" + randomName(8);
+        final int numMessages = 10;
+
+        // submit the exclamation function
+        submitExclamationFunction(
+            runtime, inputTopicName, outputTopicName, functionName);
+
+        // get function info
+        getFunctionInfoSuccess(functionName);
+
+        // publish and consume result
+        publishAndConsumeMessages(inputTopicName, outputTopicName, 
numMessages);
 
-        UploadDownloadCommandGenerator generator = 
UploadDownloadCommandGenerator.createDownloader(
-                localPkgFile,
-                bkPkgPath);
-        String actualCommand = generator.generateCommand();
+        // get function status
+        getFunctionStatus(functionName, numMessages);
 
-        log.info(actualCommand);
+        // delete function
+        deleteFunction(functionName);
 
+        // get function info
+        getFunctionInfoNotFound(functionName);
+    }
+
+    private static void submitExclamationFunction(Runtime runtime,
+                                                  String inputTopicName,
+                                                  String outputTopicName,
+                                                  String functionName) throws 
Exception {
+        CommandGenerator generator;
+        generator = CommandGenerator.createDefaultGenerator(inputTopicName, 
getExclamationClass(runtime));
+        generator.setSinkTopic(outputTopicName);
+        generator.setFunctionName(functionName);
+        String command;
+        if (Runtime.JAVA == runtime) {
+            command = generator.generateCreateFunctionCommand();
+        } else if (Runtime.PYTHON == runtime) {
+            generator.setRuntime(runtime);
+            command = 
generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE);
+        } else {
+            throw new IllegalArgumentException("Unsupported runtime : " + 
runtime);
+        }
         String[] commands = {
-            "sh", "-c", actualCommand
+            "sh", "-c", command
         };
-        WorkerContainer container = pulsarCluster.getAnyWorker();
-        ContainerExecResult output = container.execCmd(commands);
-        assertEquals(0, output.getExitCode());
-        assertTrue(output.getStdout().contains("\"Downloaded successfully\""));
-        String[] diffCommand = {
-            "diff",
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            commands);
+        assertTrue(result.getStdout().contains("\"Created successfully\""));
+    }
+
+    private static void getFunctionInfoSuccess(String functionName) throws 
Exception {
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,
-            localPkgFile
-        };
-        output = container.execCmd(diffCommand);
-        assertEquals(0, output.getExitCode());
-        assertTrue(output.getStdout().isEmpty());
-        assertTrue(output.getStderr().isEmpty());
+            "functions",
+            "get",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("\"name\": \"" + functionName + 
"\""));
     }
 
+    private static void getFunctionInfoNotFound(String functionName) throws 
Exception {
+        try {
+            pulsarCluster.getAnyWorker().execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "functions",
+                    "get",
+                    "--tenant", "public",
+                    "--namespace", "default",
+                    "--name", functionName);
+            fail("Command should have exited with non-zero");
+        } catch (ContainerExecException e) {
+            assertTrue(e.getResult().getStderr().contains("Reason: Function " 
+ functionName + " doesn't exist"));
+        }
+    }
 
+    private static void getFunctionStatus(String functionName, int 
numMessages) throws Exception {
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "getstatus",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("\"running\": true"));
+        assertTrue(result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\""));
+        assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\": 
\"" + numMessages + "\""));
+    }
+
+    private static void publishAndConsumeMessages(String inputTopic,
+                                                  String outputTopic,
+                                                  int numMessages) throws 
Exception {
+        @Cleanup PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+        @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(outputTopic)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscriptionName("test-sub")
+            .subscribe();
+        @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic(inputTopic)
+            .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.send("message-" + i);
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<String> msg = consumer.receive();
+            assertEquals("message-" + i + "!", msg.getValue());
+        }
+    }
+
+    private static void deleteFunction(String functionName) throws Exception {
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "delete",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("Deleted successfully"));
+        assertTrue(result.getStderr().isEmpty());
+    }
 
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 8e82662..fe4795d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -20,16 +20,17 @@ package org.apache.pulsar.tests.integration.functions;
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 
 /**
  * A cluster to run pulsar functions for testing functions related features.
  */
 @Slf4j
-public abstract class PulsarFunctionsTestBase extends PulsarClusterTestBase  {
+public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
 
     @DataProvider(name = "FunctionRuntimeTypes")
     public static Object[][] getData() {
@@ -49,12 +50,20 @@ public abstract class PulsarFunctionsTestBase extends 
PulsarClusterTestBase  {
         this.functionRuntimeType = functionRuntimeType;
     }
 
-    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
-        String clusterName,
-        PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
-        return super.beforeSetupCluster(clusterName, specBuilder)
-            .functionRuntimeType(functionRuntimeType)
-            .numFunctionWorkers(2);
+    @BeforeClass
+    public void setupFunctionWorkers() {
+        final int numFunctionWorkers = 2;
+        log.info("Setting up {} function workers : function runtime type = {}",
+            numFunctionWorkers, functionRuntimeType);
+        pulsarCluster.setupFunctionWorkers(randomName(5), functionRuntimeType, 
numFunctionWorkers);
+        log.info("{} function workers has started", numFunctionWorkers);
+    }
+
+    @AfterClass
+    public void teardownFunctionWorkers() {
+        log.info("Tearing down function workers ...");
+        pulsarCluster.stopWorkers();
+        log.info("All functions workers are stopped.");
     }
 
     //
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsThreadTest.java
similarity index 66%
copy from 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
copy to 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsThreadTest.java
index 03e2053..d1c132b 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsThreadTest.java
@@ -16,17 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.tests.integration.containers;
+package org.apache.pulsar.tests.integration.functions;
+
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 
 /**
- * A pulsar container that runs functions worker.
+ * Thread based test.
  */
-public class WorkerContainer extends PulsarContainer<WorkerContainer> {
-
-    public static final String NAME = "pulsar-worker";
-
-    public WorkerContainer(String clusterName, String hostname) {
-        super(
-            clusterName, hostname, hostname, "bin/run-functions-worker.sh", 
-1, BROKER_HTTP_PORT);
+public class PulsarFunctionsThreadTest extends PulsarFunctionsTest {
+    public PulsarFunctionsThreadTest() {
+        super(FunctionRuntimeType.THREAD);
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
deleted file mode 100644
index a69d137..0000000
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.functions.runtime;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import lombok.Cleanup;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
-import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
-import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
-import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.testcontainers.containers.Container.ExecResult;
-import org.testng.annotations.Factory;
-import org.testng.annotations.Test;
-
-/**
- * The tests that run over different container mode.
- */
-public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase {
-
-    @Factory(dataProvider = "FunctionRuntimeTypes")
-    PulsarFunctionsRuntimeTest(FunctionRuntimeType functionRuntimeType) {
-        super(functionRuntimeType);
-    }
-
-    //
-    // Test CRUD functions on different runtimes.
-    //
-
-    @Test(dataProvider = "FunctionRuntimes")
-    public void testExclamationFunction(Runtime runtime) throws Exception {
-        if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == 
Runtime.PYTHON) {
-            // python can only run on process mode
-            return;
-        }
-
-        String inputTopicName = "test-exclamation-" + runtime + "-input-" + 
randomName(8);
-        String outputTopicName = "test-exclamation-" + runtime + "-output-" + 
randomName(8);
-        String functionName = "test-exclamation-fn-" + randomName(8);
-        final int numMessages = 10;
-
-        // submit the exclamation function
-        submitExclamationFunction(
-            runtime, inputTopicName, outputTopicName, functionName);
-
-        // get function info
-        getFunctionInfoSuccess(functionName);
-
-        // publish and consume result
-        publishAndConsumeMessages(inputTopicName, outputTopicName, 
numMessages);
-
-        // get function status
-        getFunctionStatus(functionName, numMessages);
-
-        // delete function
-        deleteFunction(functionName);
-
-        // get function info
-        getFunctionInfoNotFound(functionName);
-    }
-
-    private static void submitExclamationFunction(Runtime runtime,
-                                                  String inputTopicName,
-                                                  String outputTopicName,
-                                                  String functionName) throws 
Exception {
-        CommandGenerator generator;
-        generator = CommandGenerator.createDefaultGenerator(inputTopicName, 
getExclamationClass(runtime));
-        generator.setSinkTopic(outputTopicName);
-        generator.setFunctionName(functionName);
-        String command;
-        if (Runtime.JAVA == runtime) {
-            command = generator.generateCreateFunctionCommand();
-        } else if (Runtime.PYTHON == runtime) {
-            generator.setRuntime(runtime);
-            command = 
generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE);
-        } else {
-            throw new IllegalArgumentException("Unsupported runtime : " + 
runtime);
-        }
-        String[] commands = {
-            "sh", "-c", command
-        };
-        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
-            commands);
-        assertTrue(result.getStdout().contains("\"Created successfully\""));
-    }
-
-    private static void getFunctionInfoSuccess(String functionName) throws 
Exception {
-        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "get",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", functionName
-        );
-        assertTrue(result.getStdout().contains("\"name\": \"" + functionName + 
"\""));
-    }
-
-    private static void getFunctionInfoNotFound(String functionName) throws 
Exception {
-        try {
-            pulsarCluster.getAnyWorker().execCmd(
-                    PulsarCluster.ADMIN_SCRIPT,
-                    "functions",
-                    "get",
-                    "--tenant", "public",
-                    "--namespace", "default",
-                    "--name", functionName);
-            fail("Command should have exited with non-zero");
-        } catch (ContainerExecException e) {
-            assertTrue(e.getResult().getStderr().contains("Reason: Function " 
+ functionName + " doesn't exist"));
-        }
-    }
-
-    private static void getFunctionStatus(String functionName, int 
numMessages) throws Exception {
-        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "getstatus",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", functionName
-        );
-        assertTrue(result.getStdout().contains("\"running\": true"));
-        assertTrue(result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\""));
-        assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\": 
\"" + numMessages + "\""));
-    }
-
-    private static void publishAndConsumeMessages(String inputTopic,
-                                                  String outputTopic,
-                                                  int numMessages) throws 
Exception {
-        @Cleanup PulsarClient client = PulsarClient.builder()
-            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
-            .build();
-        @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
-            .topic(outputTopic)
-            .subscriptionType(SubscriptionType.Exclusive)
-            .subscriptionName("test-sub")
-            .subscribe();
-        @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
-            .topic(inputTopic)
-            .create();
-
-        for (int i = 0; i < numMessages; i++) {
-            producer.send("message-" + i);
-        }
-
-        for (int i = 0; i < numMessages; i++) {
-            Message<String> msg = consumer.receive();
-            assertEquals("message-" + i + "!", msg.getValue());
-        }
-    }
-
-    private static void deleteFunction(String functionName) throws Exception {
-        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "delete",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", functionName
-        );
-        assertTrue(result.getStdout().contains("Deleted successfully"));
-        assertTrue(result.getStderr().isEmpty());
-    }
-
-}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
index 981ffbb..e31de9f 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -25,11 +25,11 @@ import com.datastax.driver.core.Session;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.CassandraContainer;
 import org.testcontainers.containers.GenericContainer;
-import org.testng.collections.Maps;
 
 import java.util.List;
 import java.util.Map;
 
+import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -40,6 +40,8 @@ import static org.junit.Assert.assertNotNull;
 @Slf4j
 public class CassandraSinkTester extends SinkTester {
 
+    private static final String NAME = "cassandra";
+
     private static final String ROOTS = "cassandra";
     private static final String KEY = "key";
     private static final String COLUMN = "col";
@@ -67,15 +69,16 @@ public class CassandraSinkTester extends SinkTester {
     }
 
     @Override
-    protected Map<String, GenericContainer<?>> newSinkService(String 
clusterName) {
-        this.cassandraCluster = new CassandraContainer(clusterName);
-        Map<String, GenericContainer<?>> containers = Maps.newHashMap();
-        containers.put(CassandraContainer.NAME, cassandraCluster);
-        return containers;
+    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
+        GenericContainer<?> container = containers.get(NAME);
+        checkState(container instanceof CassandraContainer,
+            "No kafka service found in the cluster");
+
+        this.cassandraCluster = (CassandraContainer) container;
     }
 
     @Override
-    protected void prepareSink() {
+    public void prepareSink() {
         // build the sink
         cluster = Cluster.builder()
             .addContactPoint("localhost")
@@ -101,7 +104,7 @@ public class CassandraSinkTester extends SinkTester {
     }
 
     @Override
-    protected void validateSinkResult(Map<String, String> kvs) {
+    public void validateSinkResult(Map<String, String> kvs) {
         String query = "SELECT * FROM " + tableName + ";";
         ResultSet result = session.execute(query);
         List<Row> rows = result.all();
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
index 2cad433..1cd58f2 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
+import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -35,7 +36,6 @@ import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
-import org.testng.collections.Maps;
 
 /**
  * A tester for testing kafka sink.
@@ -64,21 +64,16 @@ public class KafkaSinkTester extends SinkTester {
     }
 
     @Override
-    protected Map<String, GenericContainer<?>> newSinkService(String 
clusterName) {
-        this.kafkaContainer = new KafkaContainer()
-            .withEmbeddedZookeeper()
-            .withNetworkAliases(NAME)
-            .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
-                .withName(NAME)
-                .withHostName(clusterName + "-" + NAME));
+    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
+        GenericContainer<?> container = containers.get(NAME);
+        checkState(container instanceof KafkaContainer,
+            "No kafka service found in the cluster");
 
-        Map<String, GenericContainer<?>> containers = Maps.newHashMap();
-        containers.put("kafka", kafkaContainer);
-        return containers;
+        this.kafkaContainer = (KafkaContainer) container;
     }
 
     @Override
-    protected void prepareSink() throws Exception {
+    public void prepareSink() throws Exception {
         ExecResult execResult = kafkaContainer.execInContainer(
             "/usr/bin/kafka-topics",
             "--create",
@@ -108,7 +103,7 @@ public class KafkaSinkTester extends SinkTester {
     }
 
     @Override
-    protected void validateSinkResult(Map<String, String> kvs) {
+    public void validateSinkResult(Map<String, String> kvs) {
         Iterator<Map.Entry<String, String>> kvIter = kvs.entrySet().iterator();
         while (kvIter.hasNext()) {
             ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
index a23fba5..4928f00 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
+import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
 import static org.testng.Assert.assertTrue;
 
@@ -37,7 +38,6 @@ import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
-import org.testng.collections.Maps;
 
 /**
  * A tester for testing kafka source.
@@ -68,21 +68,16 @@ public class KafkaSourceTester extends SourceTester {
     }
 
     @Override
-    protected Map<String, GenericContainer<?>> newSourceService(String 
clusterName) {
-        this.kafkaContainer = new KafkaContainer()
-            .withEmbeddedZookeeper()
-            .withNetworkAliases(NAME)
-            .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
-                .withName(NAME)
-                .withHostName(clusterName + "-" + NAME));
+    public void findSourceServiceContainer(Map<String, GenericContainer<?>> 
containers) {
+        GenericContainer<?> container = containers.get(NAME);
+        checkState(container instanceof KafkaContainer,
+            "No kafka service found in the cluster");
 
-        Map<String, GenericContainer<?>> containers = Maps.newHashMap();
-        containers.put("kafka", kafkaContainer);
-        return containers;
+        this.kafkaContainer = (KafkaContainer) container;
     }
 
     @Override
-    protected void prepareSource() throws Exception {
+    public void prepareSource() throws Exception {
         ExecResult execResult = kafkaContainer.execInContainer(
             "/usr/bin/kafka-topics",
             "--create",
@@ -112,7 +107,7 @@ public class KafkaSourceTester extends SourceTester {
     }
 
     @Override
-    protected Map<String, String> produceSourceMessages(int numMessages) 
throws Exception{
+    public Map<String, String> produceSourceMessages(int numMessages) throws 
Exception{
         KafkaProducer<String, String> producer = new KafkaProducer<>(
                 ImmutableMap.of(
                         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers(),
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
deleted file mode 100644
index d1c80ea..0000000
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import com.google.common.base.Stopwatch;
-import com.google.gson.Gson;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
-import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import 
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
-import org.testcontainers.containers.GenericContainer;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Factory;
-import org.testng.annotations.Test;
-import org.testng.collections.Maps;
-
-/**
- * A test base for testing sink.
- */
-@Slf4j
-public class PulsarIOSinkTest extends PulsarFunctionsTestBase {
-
-    @DataProvider(name = "Sinks")
-    public static Object[][] getData() {
-        return new Object[][] {
-            { FunctionRuntimeType.PROCESS,  new CassandraSinkTester() },
-            { FunctionRuntimeType.THREAD, new CassandraSinkTester() },
-            { FunctionRuntimeType.PROCESS, new KafkaSinkTester() },
-            { FunctionRuntimeType.THREAD, new KafkaSinkTester() }
-        };
-    }
-
-    protected final SinkTester tester;
-
-    @Factory(dataProvider = "Sinks")
-    PulsarIOSinkTest(FunctionRuntimeType functionRuntimeType, SinkTester 
tester) {
-        super(functionRuntimeType);
-        this.tester = tester;
-    }
-
-    @Override
-    protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
-                                                          
PulsarClusterSpecBuilder specBuilder) {
-        Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
-        externalServices.putAll(tester.newSinkService(clusterName));
-        return super.beforeSetupCluster(clusterName, specBuilder)
-            .externalServices(externalServices);
-    }
-
-    @Test
-    public void testSink() throws Exception {
-        final String tenant = TopicName.PUBLIC_TENANT;
-        final String namespace = TopicName.DEFAULT_NAMESPACE;
-        final String inputTopicName = "test-sink-connector-input-topic-" + 
randomName(8);
-        final String sinkName = "test-sink-connector-name-" + randomName(8);
-        final int numMessages = 20;
-
-        // prepare the testing environment for sink
-        prepareSink();
-
-        // submit the sink connector
-        submitSinkConnector(tenant, namespace, sinkName, inputTopicName);
-
-        // get sink info
-        getSinkInfoSuccess(tenant, namespace, sinkName);
-
-        // get sink status
-        getSinkStatus(tenant, namespace, sinkName);
-
-        // produce messages
-        Map<String, String> kvs = produceMessagesToInputTopic(inputTopicName, 
numMessages);
-
-        // wait for sink to process messages
-        waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
-
-        // validate the sink result
-        tester.validateSinkResult(kvs);
-
-        // delete the sink
-        deleteSink(tenant, namespace, sinkName);
-
-        // get sink info (sink should be deleted)
-        getSinkInfoNotFound(tenant, namespace, sinkName);
-    }
-
-    protected void prepareSink() throws Exception {
-        tester.prepareSink();
-    }
-
-    protected void submitSinkConnector(String tenant,
-                                       String namespace,
-                                       String sinkName,
-                                       String inputTopicName) throws Exception 
{
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "sink", "create",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sinkName,
-            "--sink-type", tester.sinkType(),
-            "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
-            "--inputs", inputTopicName
-        };
-        log.info("Run command : {}", StringUtils.join(commands, ' '));
-        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
-        assertTrue(
-            result.getStdout().contains("\"Created successfully\""),
-            result.getStdout());
-    }
-
-    protected void getSinkInfoSuccess(String tenant, String namespace, String 
sinkName) throws Exception {
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "get",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sinkName
-        };
-        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
-        log.info("Get sink info : {}", result.getStdout());
-        assertTrue(
-            result.getStdout().contains("\"builtin\": \"" + tester.sinkType + 
"\""),
-            result.getStdout()
-        );
-    }
-
-    protected void getSinkStatus(String tenant, String namespace, String 
sinkName) throws Exception {
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "getstatus",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sinkName
-        };
-        while (true) {
-            try {
-                ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
-                log.info("Get sink status : {}", result.getStdout());
-                if (result.getStdout().contains("\"running\": true")) {
-                    return;
-                }
-            } catch (ContainerExecException e) {
-                // expected in early iterations
-            }
-            log.info("Backoff 1 second until the function is running");
-            TimeUnit.SECONDS.sleep(1);
-        }
-    }
-
-    protected Map<String, String> produceMessagesToInputTopic(String 
inputTopicName,
-                                                              int numMessages) 
throws Exception {
-        @Cleanup
-        PulsarClient client = PulsarClient.builder()
-            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
-            .build();
-        @Cleanup
-        Producer<String> producer = client.newProducer(Schema.STRING)
-            .topic(inputTopicName)
-            .create();
-        LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
-        for (int i = 0; i < numMessages; i++) {
-            String key = "key-" + i;
-            String value = "value-" + i;
-            kvs.put(key, value);
-            producer.newMessage()
-                .key(key)
-                .value(value)
-                .send();
-        }
-        return kvs;
-    }
-
-    protected void waitForProcessingMessages(String tenant,
-                                             String namespace,
-                                             String sinkName,
-                                             int numMessages) throws Exception 
{
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "getstatus",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sinkName
-        };
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        while (true) {
-            try {
-                ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
-                log.info("Get sink status : {}", result.getStdout());
-                if (result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
-                    return;
-                }
-            } catch (ContainerExecException e) {
-                // expected in early iterations
-            }
-
-            log.info("{} ms has elapsed but the sink hasn't process {} 
messages, backoff to wait for another 1 second",
-                stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
-            TimeUnit.SECONDS.sleep(1);
-        }
-    }
-
-    protected void deleteSink(String tenant, String namespace, String 
sinkName) throws Exception {
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "sink",
-            "delete",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sinkName
-        };
-        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
-        assertTrue(
-            result.getStdout().contains("Deleted successfully"),
-            result.getStdout()
-        );
-        assertTrue(
-            result.getStderr().isEmpty(),
-            result.getStderr()
-        );
-    }
-
-    protected void getSinkInfoNotFound(String tenant, String namespace, String 
sinkName) throws Exception {
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "get",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sinkName
-        };
-        try {
-            pulsarCluster.getAnyWorker().execCmd(commands);
-            fail("Command should have exited with non-zero");
-        } catch (ContainerExecException e) {
-            assertTrue(e.getResult().getStderr().contains("Reason: Function " 
+ sinkName + " doesn't exist"));
-        }
-    }
-}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
deleted file mode 100644
index 8bd76ef..0000000
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSourceTest.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import com.google.common.base.Stopwatch;
-import com.google.gson.Gson;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
-import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import 
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
-import org.testcontainers.containers.GenericContainer;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Factory;
-import org.testng.annotations.Test;
-import org.testng.collections.Maps;
-
-/**
- * A test base for testing source.
- */
-@Slf4j
-public class PulsarIOSourceTest extends PulsarFunctionsTestBase {
-
-    @DataProvider(name = "Sources")
-    public static Object[][] getData() {
-        return new Object[][] {
-            { FunctionRuntimeType.PROCESS, new KafkaSourceTester() },
-            { FunctionRuntimeType.THREAD, new KafkaSourceTester() }
-        };
-    }
-
-    protected final SourceTester tester;
-
-    @Factory(dataProvider = "Sources")
-    PulsarIOSourceTest(FunctionRuntimeType functionRuntimeType, SourceTester 
tester) {
-        super(functionRuntimeType);
-        this.tester = tester;
-    }
-
-    @Override
-    protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
-                                                          
PulsarClusterSpecBuilder specBuilder) {
-        Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
-        externalServices.putAll(tester.newSourceService(clusterName));
-        return super.beforeSetupCluster(clusterName, specBuilder)
-            .externalServices(externalServices);
-    }
-
-    @Test
-    public void testSource() throws Exception {
-        final String tenant = TopicName.PUBLIC_TENANT;
-        final String namespace = TopicName.DEFAULT_NAMESPACE;
-        final String outputTopicName = "test-source-connector-output-topic-" + 
randomName(8);
-        final String sourceName = "test-source-connector-name-" + 
randomName(8);
-        final int numMessages = 20;
-
-        @Cleanup
-        PulsarClient client = PulsarClient.builder()
-            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
-            .build();
-
-        @Cleanup
-        Consumer<String> consumer = client.newConsumer(Schema.STRING)
-            .topic(outputTopicName)
-            .subscriptionName("source-tester")
-            .subscriptionType(SubscriptionType.Exclusive)
-            .subscribe();
-
-        // prepare the testing environment for source
-        prepareSource();
-
-        // submit the source connector
-        submitSourceConnector(tenant, namespace, sourceName, outputTopicName);
-
-        // get source info
-        getSourceInfoSuccess(tenant, namespace, sourceName);
-
-        // get source status
-        getSourceStatus(tenant, namespace, sourceName);
-
-        // produce messages
-        Map<String, String> kvs = tester.produceSourceMessages(numMessages);
-
-        // wait for source to process messages
-        waitForProcessingMessages(tenant, namespace, sourceName, numMessages);
-
-        // validate the source result
-        validateSourceResult(consumer, kvs);
-
-        // delete the source
-        deleteSource(tenant, namespace, sourceName);
-
-        // get source info (source should be deleted)
-        getSourceInfoNotFound(tenant, namespace, sourceName);
-    }
-
-    protected void prepareSource() throws Exception {
-        tester.prepareSource();
-    }
-
-    protected void submitSourceConnector(String tenant,
-                                       String namespace,
-                                       String sourceName,
-                                       String outputTopicName) throws 
Exception {
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "source", "create",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sourceName,
-            "--source-type", tester.sourceType(),
-            "--sourceConfig", new Gson().toJson(tester.sourceConfig()),
-            "--destinationTopicName", outputTopicName
-        };
-        log.info("Run command : {}", StringUtils.join(commands, ' '));
-        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
-        assertTrue(
-            result.getStdout().contains("\"Created successfully\""),
-            result.getStdout());
-    }
-
-    protected void getSourceInfoSuccess(String tenant, String namespace, 
String sourceName) throws Exception {
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "get",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sourceName
-        };
-        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
-        log.info("Get source info : {}", result.getStdout());
-        assertTrue(
-            result.getStdout().contains("\"builtin\": \"" + tester.sourceType 
+ "\""),
-            result.getStdout()
-        );
-    }
-
-    protected void getSourceStatus(String tenant, String namespace, String 
sourceName) throws Exception {
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "getstatus",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sourceName
-        };
-        while (true) {
-            try {
-                ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
-                log.info("Get source status : {}", result.getStdout());
-                if (result.getStdout().contains("\"running\": true")) {
-                    return;
-                }
-            } catch (ContainerExecException e) {
-                // expected for early iterations
-            }
-            log.info("Backoff 1 second until the function is running");
-            TimeUnit.SECONDS.sleep(1);
-        }
-    }
-
-    protected void validateSourceResult(Consumer<String> consumer,
-                                        Map<String, String> kvs) throws 
Exception {
-        for (Map.Entry<String, String> kv : kvs.entrySet()) {
-            Message<String> msg = consumer.receive();
-            assertEquals(kv.getKey(), msg.getKey());
-            assertEquals(kv.getValue(), msg.getValue());
-        }
-    }
-
-    protected void waitForProcessingMessages(String tenant,
-                                             String namespace,
-                                             String sourceName,
-                                             int numMessages) throws Exception 
{
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "getstatus",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sourceName
-        };
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        while (true) {
-            try {
-                ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
-                log.info("Get source status : {}", result.getStdout());
-                if (result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
-                    return;
-                }
-            } catch (ContainerExecException e) {
-                // expected for early iterations
-            }
-            log.info("{} ms has elapsed but the source hasn't process {} 
messages, backoff to wait for another 1 second",
-                stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
-            TimeUnit.SECONDS.sleep(1);
-        }
-    }
-
-    protected void deleteSource(String tenant, String namespace, String 
sourceName) throws Exception {
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "source",
-            "delete",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sourceName
-        };
-        ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
-        assertTrue(
-            result.getStdout().contains("Delete source successfully"),
-            result.getStdout()
-        );
-        assertTrue(
-            result.getStderr().isEmpty(),
-            result.getStderr()
-        );
-    }
-
-    protected void getSourceInfoNotFound(String tenant, String namespace, 
String sourceName) throws Exception {
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "get",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sourceName
-        };
-        try {
-            pulsarCluster.getAnyWorker().execCmd(commands);
-            fail("Command should have exited with non-zero");
-        } catch (ContainerExecException e) {
-            assertTrue(e.getResult().getStderr().contains("Reason: Function " 
+ sourceName + " doesn't exist"));
-        }
-    }
-}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index 75434be..3eee821 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -19,34 +19,36 @@
 package org.apache.pulsar.tests.integration.io;
 
 import java.util.Map;
+import lombok.Getter;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.collections.Maps;
 
 /**
  * A tester used for testing a specific sink.
  */
+@Getter
 public abstract class SinkTester {
 
     protected final String sinkType;
     protected final Map<String, Object> sinkConfig;
 
-    protected SinkTester(String sinkType) {
+    public SinkTester(String sinkType) {
         this.sinkType = sinkType;
         this.sinkConfig = Maps.newHashMap();
     }
 
-    protected abstract Map<String, GenericContainer<?>> newSinkService(String 
clusterName);
+    public abstract void findSinkServiceContainer(Map<String, 
GenericContainer<?>> externalServices);
 
-    protected String sinkType() {
+    public String sinkType() {
         return sinkType;
     }
 
-    protected Map<String, Object> sinkConfig() {
+    public Map<String, Object> sinkConfig() {
         return sinkConfig;
     }
 
-    protected abstract void prepareSink() throws Exception;
+    public abstract void prepareSink() throws Exception;
 
-    protected abstract void validateSinkResult(Map<String, String> kvs);
+    public abstract void validateSinkResult(Map<String, String> kvs);
 
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
index a28d086..dc58f2f 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
@@ -19,12 +19,14 @@
 package org.apache.pulsar.tests.integration.io;
 
 import java.util.Map;
+import lombok.Getter;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.collections.Maps;
 
 /**
  * A tester used for testing a specific source.
  */
+@Getter
 public abstract class SourceTester {
 
     protected final String sourceType;
@@ -35,18 +37,18 @@ public abstract class SourceTester {
         this.sourceConfig = Maps.newHashMap();
     }
 
-    protected abstract Map<String, GenericContainer<?>> 
newSourceService(String clusterName);
+    public abstract void findSourceServiceContainer(Map<String, 
GenericContainer<?>> externalServices);
 
-    protected String sourceType() {
+    public String sourceType() {
         return sourceType;
     }
 
-    protected Map<String, Object> sourceConfig() {
+    public Map<String, Object> sourceConfig() {
         return sourceConfig;
     }
 
-    protected abstract void prepareSource() throws Exception;
+    public abstract void prepareSource() throws Exception;
 
-    protected abstract Map<String, String> produceSourceMessages(int 
numMessages) throws Exception;
+    public abstract Map<String, String> produceSourceMessages(int numMessages) 
throws Exception;
 
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
index dd1cb7a..1bae448 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
@@ -20,9 +20,7 @@ package org.apache.pulsar.tests.integration.offload;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
-import com.google.common.collect.ImmutableMap;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -36,57 +34,18 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
-
-import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.containers.S3Container;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite;
 
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static java.util.stream.Collectors.joining;
-
 @Slf4j
-public class TestS3Offload extends PulsarClusterTestBase {
+public class TestS3Offload extends PulsarTieredStorageTestSuite {
 
     private static final int ENTRY_SIZE = 1024;
-    private static final int ENTRIES_PER_LEDGER = 1024;
-
-    @Override
-    @BeforeClass
-    public void setupCluster() throws Exception {
-
-        final String clusterName = Stream.of(this.getClass().getSimpleName(), 
randomName(5))
-                .filter(s -> s != null && !s.isEmpty())
-                .collect(joining("-"));
-
-        PulsarClusterSpec spec = PulsarClusterSpec.builder()
-            .numBookies(2)
-            .numBrokers(1)
-            .externalServices(ImmutableMap.of(S3Container.NAME, new 
S3Container(clusterName, S3Container.NAME)))
-            .clusterName(clusterName)
-            .build();
-
-        log.info("Setting up cluster {} with {} bookies, {} brokers",
-                spec.clusterName(), spec.numBookies(), spec.numBrokers());
-
-        pulsarCluster = PulsarCluster.forSpec(spec);
-
-        for(BrokerContainer brokerContainer : pulsarCluster.getBrokers()){
-            brokerContainer.withEnv("managedLedgerMaxEntriesPerLedger", 
String.valueOf(ENTRIES_PER_LEDGER));
-            
brokerContainer.withEnv("managedLedgerMinLedgerRolloverTimeMinutes", "0");
-            brokerContainer.withEnv("managedLedgerOffloadDriver", "s3");
-            brokerContainer.withEnv("s3ManagedLedgerOffloadBucket", 
"pulsar-integtest");
-            brokerContainer.withEnv("s3ManagedLedgerOffloadServiceEndpoint", 
"http://"; + S3Container.NAME + ":9090");
-        }
-
-        pulsarCluster.start();
-
-        log.info("Cluster {} is setup", spec.clusterName());
-    }
 
     private static byte[] buildEntry(String pattern) {
         byte[] entry = new byte[ENTRY_SIZE];
@@ -98,6 +57,25 @@ public class TestS3Offload extends PulsarClusterTestBase {
         return entry;
     }
 
+    private S3Container s3Container;
+
+    @BeforeClass
+    public void setupS3() {
+        s3Container = new S3Container(
+            pulsarCluster.getClusterName(),
+            S3Container.NAME)
+            .withNetwork(pulsarCluster.getNetwork())
+            .withNetworkAliases(S3Container.NAME);
+        s3Container.start();
+    }
+
+    @AfterClass
+    public void teardownS3() {
+        if (null != s3Container) {
+            s3Container.stop();
+        }
+    }
+
     @Test(dataProvider =  "ServiceAndAdminUrls")
     public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String 
adminUrl) throws Exception {
         final String tenant = "s3-offload-test-cli-" + randomName(4);
@@ -350,4 +328,5 @@ public class TestS3Offload extends PulsarClusterTestBase {
         Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
     }
 
+
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
deleted file mode 100644
index b22bd74..0000000
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/smoke/SmokeTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.smoke;
-
-import static org.testng.Assert.assertEquals;
-
-import lombok.Cleanup;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testng.annotations.Test;
-
-import java.util.concurrent.TimeUnit;
-
-public class SmokeTest extends PulsarClusterTestBase {
-
-    private final static String clusterName = "test";
-
-    @Test(dataProvider = "ServiceUrls")
-    public void testPublishAndConsume(String serviceUrl) throws Exception {
-
-        this.createTenantName("smoke-test", clusterName, "smoke-admin");
-        pulsarCluster.createNamespace(clusterName);
-        String topic = "persistent://smoke-test/test/ns1/topic1";
-
-        @Cleanup
-        PulsarClient client = PulsarClient.builder()
-            .serviceUrl(serviceUrl)
-            .build();
-
-        @Cleanup
-        Consumer<String> consumer = client.newConsumer(Schema.STRING)
-            .topic(topic)
-            .subscriptionName("test-sub")
-            .ackTimeout(10, TimeUnit.SECONDS)
-            .subscriptionType(SubscriptionType.Exclusive)
-            .subscribe();
-
-        @Cleanup
-        Producer<String> producer = client.newProducer(Schema.STRING)
-            .topic(topic)
-            .enableBatching(false)
-            .producerName("effectively-once-producer")
-            .initialSequenceId(1L)
-            .create();
-
-        for (int i = 0; i < 10; i++) {
-            producer.send(("smoke-message" + i));
-        }
-        for (int i = 0; i < 10; i++) {
-            Message m = consumer.receive();
-            assertEquals("smoke-message" + i, new String(m.getData()));
-        }
-    }
-
-    private ContainerExecResult createTenantName(String tenantName,
-                                                 String clusterName,
-                                                 String roleName) throws 
Exception {
-        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
-            "tenants", "create", tenantName, "--allowed-clusters", clusterName,
-            "--admin-roles", roleName);
-        assertEquals(0, result.getExitCode());
-        return result;
-    }
-}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
new file mode 100644
index 0000000..e20a933
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.suites;
+
+import java.util.Map;
+import org.apache.pulsar.tests.integration.containers.CassandraContainer;
+import 
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testng.ITest;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.collections.Maps;
+
+public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
+
+    @BeforeSuite
+    @Override
+    public void setupCluster() throws Exception {
+        super.setupCluster();
+    }
+
+    @AfterSuite
+    @Override
+    public void tearDownCluster() {
+        super.tearDownCluster();
+    }
+
+    @Override
+    protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, 
PulsarClusterSpecBuilder specBuilder) {
+        PulsarClusterSpecBuilder builder = 
super.beforeSetupCluster(clusterName, specBuilder);
+
+        // start functions
+
+        // register external services
+        Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+        final String kafkaServiceName = "kafka";
+        externalServices.put(
+            kafkaServiceName,
+            new KafkaContainer()
+                .withEmbeddedZookeeper()
+                .withNetworkAliases(kafkaServiceName)
+                .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
+                    .withName(kafkaServiceName)
+                    .withHostName(clusterName + "-" + kafkaServiceName)));
+        final String cassandraServiceName = "cassandra";
+        externalServices.put(
+            cassandraServiceName,
+            new CassandraContainer(clusterName));
+        builder = builder.externalServices(externalServices);
+
+        return builder;
+    }
+
+    @Override
+    public String getTestName() {
+        return "pulsar-test-suite";
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
new file mode 100644
index 0000000..d248dce
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.suites;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.stream.Stream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.S3Container;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testng.ITest;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+
+@Slf4j
+public class PulsarTieredStorageTestSuite extends PulsarClusterTestBase 
implements ITest {
+
+    protected static final int ENTRIES_PER_LEDGER = 1024;
+
+    @BeforeSuite
+    @Override
+    public void setupCluster() throws Exception {
+        final String clusterName = Stream.of(this.getClass().getSimpleName(), 
randomName(5))
+                .filter(s -> s != null && !s.isEmpty())
+                .collect(joining("-"));
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+            .numBookies(2)
+            .numBrokers(1)
+            .clusterName(clusterName)
+            .build();
+
+        log.info("Setting up cluster {} with {} bookies, {} brokers",
+                spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+
+        for(BrokerContainer brokerContainer : pulsarCluster.getBrokers()){
+            brokerContainer.withEnv("managedLedgerMaxEntriesPerLedger", 
String.valueOf(ENTRIES_PER_LEDGER));
+            
brokerContainer.withEnv("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+            brokerContainer.withEnv("managedLedgerOffloadDriver", "s3");
+            brokerContainer.withEnv("s3ManagedLedgerOffloadBucket", 
"pulsar-integtest");
+            brokerContainer.withEnv("s3ManagedLedgerOffloadServiceEndpoint", 
"http://"; + S3Container.NAME + ":9090");
+        }
+
+        pulsarCluster.start();
+
+        log.info("Cluster {} is setup", spec.clusterName());
+    }
+
+    @AfterSuite
+    @Override
+    public void tearDownCluster() {
+        super.tearDownCluster();
+    }
+
+    @Override
+    public String getTestName() {
+        return "tiered-storage-test-suite";
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 2f1653a..8ff1e32 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -29,7 +29,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Stream;
 
@@ -78,6 +77,7 @@ public class PulsarCluster {
     private final Map<String, BrokerContainer> brokerContainers;
     private final Map<String, WorkerContainer> workerContainers;
     private final ProxyContainer proxyContainer;
+    private Map<String, GenericContainer<?>> externalServices = 
Collections.emptyMap();
 
     private PulsarCluster(PulsarClusterSpec spec) {
 
@@ -148,6 +148,14 @@ public class PulsarCluster {
         return zkContainer.getContainerIpAddress() + ":" + 
zkContainer.getMappedPort(ZK_PORT);
     }
 
+    public Network getNetwork() {
+        return network;
+    }
+
+    public Map<String, GenericContainer<?>> getExternalServices() {
+        return externalServices;
+    }
+
     public void start() throws Exception {
         // start the local zookeeper
         zkContainer.start();
@@ -178,20 +186,8 @@ public class PulsarCluster {
         log.info("\tBinary Service Url : {}", getPlainTextServiceUrl());
         log.info("\tHttp Service Url : {}", getHttpServiceUrl());
 
-        // start function workers
-        if (spec.numFunctionWorkers() > 0) {
-            switch (spec.functionRuntimeType()) {
-                case THREAD:
-                    
startFunctionWorkersWithThreadContainerFactory(spec.numFunctionWorkers());
-                    break;
-                case PROCESS:
-                    
startFunctionWorkersWithProcessContainerFactory(spec.numFunctionWorkers());
-                    break;
-            }
-        }
-
         // start external services
-        final Map<String, GenericContainer<?>> externalServices = 
spec.externalServices;
+        this.externalServices = spec.externalServices;
         if (null != externalServices) {
             externalServices.entrySet().parallelStream().forEach(service -> {
                 GenericContainer<?> serviceContainer = service.getValue();
@@ -206,7 +202,6 @@ public class PulsarCluster {
     private static <T extends PulsarContainer> Map<String, T> 
runNumContainers(String serviceName,
                                                                                
int numContainers,
                                                                                
Function<String, T> containerCreator) {
-        List<CompletableFuture<?>> startFutures = Lists.newArrayList();
         Map<String, T> containers = Maps.newTreeMap();
         for (int i = 0; i < numContainers; i++) {
             String name = "pulsar-" + serviceName + "-" + i;
@@ -216,8 +211,7 @@ public class PulsarCluster {
         return containers;
     }
 
-    public void stop() {
-
+    public synchronized void stop() {
         Stream<GenericContainer> containers = Streams.concat(
                 workerContainers.values().stream(),
                 brokerContainers.values().stream(),
@@ -238,11 +232,22 @@ public class PulsarCluster {
         }
     }
 
-    private void startFunctionWorkersWithProcessContainerFactory(int 
numFunctionWorkers) {
+    public synchronized void setupFunctionWorkers(String suffix, 
FunctionRuntimeType runtimeType, int numFunctionWorkers) {
+        switch (runtimeType) {
+            case THREAD:
+                startFunctionWorkersWithThreadContainerFactory(suffix, 
numFunctionWorkers);
+                break;
+            case PROCESS:
+                startFunctionWorkersWithProcessContainerFactory(suffix, 
numFunctionWorkers);
+                break;
+        }
+    }
+
+    private void startFunctionWorkersWithProcessContainerFactory(String 
suffix, int numFunctionWorkers) {
         String serviceUrl = "pulsar://pulsar-broker-0:" + 
PulsarContainer.BROKER_PORT;
         String httpServiceUrl = "http://pulsar-broker-0:"; + 
PulsarContainer.BROKER_HTTP_PORT;
         workerContainers.putAll(runNumContainers(
-            "functions-worker",
+            "functions-worker-process-" + suffix,
             numFunctionWorkers,
             (name) -> new WorkerContainer(clusterName, name)
                 .withNetwork(network)
@@ -263,11 +268,11 @@ public class PulsarCluster {
         this.startWorkers();
     }
 
-    private void startFunctionWorkersWithThreadContainerFactory(int 
numFunctionWorkers) {
+    private void startFunctionWorkersWithThreadContainerFactory(String suffix, 
int numFunctionWorkers) {
         String serviceUrl = "pulsar://pulsar-broker-0:" + 
PulsarContainer.BROKER_PORT;
         String httpServiceUrl = "http://pulsar-broker-0:"; + 
PulsarContainer.BROKER_HTTP_PORT;
         workerContainers.putAll(runNumContainers(
-            "functions-worker",
+            "functions-worker-thread-" + suffix,
             numFunctionWorkers,
             (name) -> new WorkerContainer(clusterName, name)
                 .withNetwork(network)
@@ -289,17 +294,38 @@ public class PulsarCluster {
         this.startWorkers();
     }
 
-    private void startWorkers() {
+    public synchronized void startWorkers() {
         // Start workers that have been initialized
         
workerContainers.values().parallelStream().forEach(WorkerContainer::start);
         log.info("Successfully started {} worker conntainers.", 
workerContainers.size());
     }
 
+    public synchronized void stopWorkers() {
+        // Stop workers that have been initialized
+        
workerContainers.values().parallelStream().forEach(WorkerContainer::stop);
+        workerContainers.clear();
+    }
+
+    public void startContainers(Map<String, GenericContainer<?>> containers) {
+        containers.forEach((name, container) -> {
+            container
+                .withNetwork(network)
+                .withNetworkAliases(name)
+                .start();
+            log.info("Successfully start container {}.", name);
+        });
+    }
+
+    public void stopContainers(Map<String, GenericContainer<?>> containers) {
+        containers.values().parallelStream().forEach(GenericContainer::stop);
+        log.info("Successfully stop containers : {}", containers);
+    }
+
     public BrokerContainer getAnyBroker() {
         return getAnyContainer(brokerContainers, "broker");
     }
 
-    public WorkerContainer getAnyWorker() {
+    public synchronized WorkerContainer getAnyWorker() {
         return getAnyContainer(workerContainers, "functions-worker");
     }
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
index e2e89dc..6181192 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
@@ -19,8 +19,6 @@
 package org.apache.pulsar.tests.integration.topologies;
 
 import lombok.extern.slf4j.Slf4j;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 
 import java.util.concurrent.ThreadLocalRandom;
@@ -70,7 +68,6 @@ public abstract class PulsarClusterTestBase {
 
     protected static PulsarCluster pulsarCluster;
 
-    @BeforeClass
     public void setupCluster() throws Exception {
         this.setupCluster("");
     }
@@ -92,17 +89,23 @@ public abstract class PulsarClusterTestBase {
         return specBuilder;
     }
 
+    protected void beforeStartCluster() throws Exception {
+        // no-op
+    }
+
     protected void setupCluster(PulsarClusterSpec spec) throws Exception {
         log.info("Setting up cluster {} with {} bookies, {} brokers",
                 spec.clusterName(), spec.numBookies(), spec.numBrokers());
 
         pulsarCluster = PulsarCluster.forSpec(spec);
+
+        beforeStartCluster();
+
         pulsarCluster.start();
 
         log.info("Cluster {} is setup", spec.clusterName());
     }
 
-    @AfterClass
     public void tearDownCluster() {
         if (null != pulsarCluster) {
             pulsarCluster.stop();
diff --git a/tests/integration/src/test/resources/pulsar-process.xml 
b/tests/integration/src/test/resources/pulsar-process.xml
new file mode 100644
index 0000000..482e570
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-process.xml
@@ -0,0 +1,30 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd"; >
+<suite name="Pulsar (Process Function Worker) Integration Tests" verbose="2" 
annotations="JDK">
+    <test name="pulsar-test-suite" preserve-order="true" >
+        <classes>
+            <class name="org.apache.pulsar.tests.integration.cli.CLITest" />
+            <class 
name="org.apache.pulsar.tests.integration.compaction.TestCompaction" />
+            <class 
name="org.apache.pulsar.tests.integration.functions.PulsarFunctionsProcessTest" 
/>
+        </classes>
+    </test>
+</suite>
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar-thread.xml 
b/tests/integration/src/test/resources/pulsar-thread.xml
new file mode 100644
index 0000000..14a908f
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-thread.xml
@@ -0,0 +1,28 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd"; >
+<suite name="Pulsar (Thread Function Worker) Integration Tests" verbose="2" 
annotations="JDK">
+    <test name="pulsar-thread-test-suite" preserve-order="true">
+        <classes>
+            <class 
name="org.apache.pulsar.tests.integration.functions.PulsarFunctionsThreadTest" 
/>
+        </classes>
+    </test>
+</suite>
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar.xml 
b/tests/integration/src/test/resources/pulsar.xml
new file mode 100644
index 0000000..ac21f4c
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -0,0 +1,30 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd"; >
+<!-- TODO: we have to put suite files in one file to avoid executing TESTNG 
test suites multiple times.
+           see {@link https://github.com/cbeust/testng/issues/508} -->
+<suite name="Pulsar Test Suite" parallel="instances" thread-count="1">
+    <suite-files>
+        <suite-file path="./pulsar-process.xml" />
+        <suite-file path="./pulsar-thread.xml" />
+        <suite-file path="./tiered-storage.xml" />
+    </suite-files>
+</suite>
diff --git a/tests/integration/src/test/resources/tiered-storage.xml 
b/tests/integration/src/test/resources/tiered-storage.xml
new file mode 100644
index 0000000..8cbdaa7
--- /dev/null
+++ b/tests/integration/src/test/resources/tiered-storage.xml
@@ -0,0 +1,28 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd"; >
+<suite name="Pulsar (Tiered Storage) Integration Tests" verbose="2" 
annotations="JDK">
+    <test name="tiered-storage-test-suite" preserve-order="true">
+        <classes>
+            <class 
name="org.apache.pulsar.tests.integration.offload.TestS3Offload" />
+        </classes>
+    </test>
+</suite>
\ No newline at end of file

Reply via email to