sijie closed pull request #2587: [tests] improve connector related integration 
tests
URL: https://github.com/apache/incubator-pulsar/pull/2587
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distribution/io/src/assemble/io.xml 
b/distribution/io/src/assemble/io.xml
index 08ff859e44..a509e19254 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -92,5 +92,11 @@
       <outputDirectory>connectors</outputDirectory>
       <fileMode>644</fileMode>
     </file>
+
+    <file>
+      
<source>${basedir}/../../pulsar-io/elastic-search/target/pulsar-io-elastic-search-${project.version}.nar</source>
+      <outputDirectory>connectors</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
   </files>
 </assembly>
diff --git a/docker/pom.xml b/docker/pom.xml
index bdc99f7297..302bda80cc 100644
--- a/docker/pom.xml
+++ b/docker/pom.xml
@@ -31,9 +31,6 @@
   <groupId>org.apache.pulsar</groupId>
   <artifactId>docker-images</artifactId>
   <name>Apache Pulsar :: Docker Images</name>
-  <properties>
-    <docker.organization>apachepulsar</docker.organization>
-  </properties>
   <modules>
     <module>pulsar</module>
     <module>grafana</module>
diff --git a/pom.xml b/pom.xml
index 6d45a0093d..91ba14c55a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <redirectTestOutputToFile>true</redirectTestOutputToFile>
     <testRealAWS>false</testRealAWS>
     <testRetryCount>1</testRetryCount>
+    <docker.organization>apachepulsar</docker.organization>
 
     <!-- pin the protobuf-shaded version to make the pulsar build friendly to 
intellij -->
     
<pulsar.protobuf.shaded.version>2.1.0-incubating</pulsar.protobuf.shaded.version>
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 2937ca0caf..fb7a76cdff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -233,66 +233,6 @@ public void testReplayOnConsumerDisconnect() throws 
Exception {
         deleteTopic(topicName);
     }
 
-    @Test
-    public void testConsumersWithDifferentPermits() throws Exception {
-        final String topicName = "persistent://prop/use/ns-abc/shared-topic4";
-        final String subName = "sub4";
-        final int numMsgs = 10000;
-
-        final AtomicInteger msgCountConsumer1 = new AtomicInteger(0);
-        final AtomicInteger msgCountConsumer2 = new AtomicInteger(0);
-        final CountDownLatch latch = new CountDownLatch(numMsgs);
-
-        int recvQ1 = 10;
-        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-                
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ1)
-                .messageListener((consumer, msg) -> {
-                    msgCountConsumer1.incrementAndGet();
-                    try {
-                        consumer.acknowledge(msg);
-                        latch.countDown();
-                    } catch (PulsarClientException e) {
-                        fail("Should not fail");
-                    }
-                }).subscribe();
-
-        int recvQ2 = 1;
-        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-                
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ2)
-                .messageListener((consumer, msg) -> {
-                    msgCountConsumer2.incrementAndGet();
-                    try {
-                        consumer.acknowledge(msg);
-                        latch.countDown();
-                    } catch (PulsarClientException e) {
-                        fail("Should not fail");
-                    }
-                }).subscribe();
-
-        List<CompletableFuture<MessageId>> futures = 
Lists.newArrayListWithCapacity(numMsgs);
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-            .enableBatching(false)
-            .maxPendingMessages(numMsgs + 1)
-            .messageRoutingMode(MessageRoutingMode.SinglePartition)
-            .create();
-        for (int i = 0; i < numMsgs; i++) {
-            String message = "msg-" + i;
-            futures.add(producer.sendAsync(message.getBytes()));
-        }
-        FutureUtil.waitForAll(futures).get();
-        producer.close();
-
-        latch.await(5, TimeUnit.SECONDS);
-
-        assertEquals(msgCountConsumer1.get(), numMsgs - numMsgs / (recvQ1 + 
recvQ2), numMsgs * 0.1);
-        assertEquals(msgCountConsumer2.get(), numMsgs / (recvQ1 + recvQ2), 
numMsgs * 0.1);
-
-        consumer1.close();
-        consumer2.close();
-
-        deleteTopic(topicName);
-    }
-
     // this test is good to have to see the distribution, but every now and 
then it gets slightly different than the
     // expected numbers. keeping this disabled to not break the build, but 
nevertheless this gives good insight into
     // how the round robin distribution algorithm is behaving
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4ba7340016..b3f86eae0e 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -146,7 +146,8 @@ JavaInstance setupJavaInstance(ContextImpl contextImpl) 
throws Exception {
         ThreadContext.put("functionname", 
instanceConfig.getFunctionDetails().getName());
         ThreadContext.put("instance", instanceConfig.getInstanceId());
 
-        log.info("Starting Java Instance {}", 
instanceConfig.getFunctionDetails().getName());
+        log.info("Starting Java Instance {} : \n Details = {}",
+            instanceConfig.getFunctionDetails().getName(), 
instanceConfig.getFunctionDetails());
 
         // start the function thread
         loadJars();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 76375dcc67..db9e880b68 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -78,6 +78,9 @@ public TopicSchema(PulsarClient client) {
     private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) {
         if (GenericRecord.class.isAssignableFrom(clazz)) {
             return SchemaType.AUTO;
+        } else if (byte[].class.equals(clazz)) {
+            // if function uses bytes, we should ignore
+            return SchemaType.NONE;
         } else {
             Optional<SchemaInfo> schema = ((PulsarClientImpl) 
client).getSchema(topic).join();
             if (schema.isPresent()) {
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
similarity index 94%
rename from 
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java
rename to 
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 3760d4072b..86546f3c96 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -51,7 +51,7 @@
  * Users need to implement extractKeyValue function to use this sink.
  * This class assumes that the input will be JSON documents
  */
-public abstract class ElasticSearchAbstractSink<K, V> implements Sink<byte[]> {
+public class ElasticSearchSink implements Sink<byte[]> {
 
     protected static final String DOCUMENT = "doc";
 
@@ -74,7 +74,7 @@ public void close() throws Exception {
 
     @Override
     public void write(Record<byte[]> record) {
-        KeyValue<K, V> keyValue = extractKeyValue(record);
+        KeyValue<String, byte[]> keyValue = extractKeyValue(record);
         IndexRequest indexRequest = 
Requests.indexRequest(elasticSearchConfig.getIndexName());
         indexRequest.type(DOCUMENT);
         indexRequest.source(keyValue.getValue(), XContentType.JSON);
@@ -91,7 +91,10 @@ public void write(Record<byte[]> record) {
         }
     }
 
-    public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> record);
+    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
+        String key = record.getKey().orElseGet(null);
+        return new KeyValue<>(key, record.getValue());
+    }
 
     private void createIndexIfNeeded() throws IOException {
         GetIndexRequest request = new GetIndexRequest();
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java
deleted file mode 100644
index 6cfa03d5c0..0000000000
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.elasticsearch;
-
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-
-/**
- * Concrete ElasticSearch sink.
- * This class assumes that the input will be JSON documents
- */
-public class ElasticSearchStringSink extends ElasticSearchAbstractSink<String, 
String> {
-
-    @Override
-    public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
-        String key = record.getKey().orElseGet(() -> new 
String(record.getValue()));
-        return new KeyValue<>(key, new String(record.getValue()));
-    }
-}
diff --git 
a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
index 0307516cc8..97789e9851 100644
--- 
a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
+++ 
b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -17,6 +17,6 @@
 # under the License.
 #
 
-name: Elastic Search
+name: elastic_search
 description: Writes data into Elastic Search
-sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchStringSink
+sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchSink
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index ea2b886e39..f1888293f8 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -56,7 +56,7 @@
     @Mock
     protected SinkContext mockSinkContext;   
     protected Map<String, Object> map;
-    protected ElasticSearchStringSink sink;
+    protected ElasticSearchSink sink;
     
     @BeforeClass
     public static final void init() {
@@ -71,7 +71,7 @@ public static final void init() {
     public final void setUp() throws Exception {
         map = new HashMap<String, Object> ();
         map.put("elasticSearchUrl", "http://localhost:9200";);
-        sink = new ElasticSearchStringSink();
+        sink = new ElasticSearchSink();
         
         mockRecord = mock(Record.class);
         mockSinkContext = mock(SinkContext.class);
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index a92a368f1c..50ce4b97b6 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -70,6 +70,10 @@ public void close() throws IOException {
         }
     }
 
+    protected Properties beforeCreateProducer(Properties props) {
+        return props;
+    }
+
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         kafkaSinkConfig = KafkaSinkConfig.load(config);
@@ -89,7 +93,7 @@ public void open(Map<String, Object> config, SinkContext 
sinkContext) throws Exc
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
kafkaSinkConfig.getKeySerializerClass());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
kafkaSinkConfig.getValueSerializerClass());
 
-        producer = new KafkaProducer<>(props);
+        producer = new KafkaProducer<>(beforeCreateProducer(props));
 
         log.info("Kafka sink started.");
     }
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
similarity index 50%
rename from 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
rename to 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
index 89e3e7f8d6..9ce2bdcb77 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
@@ -19,16 +19,31 @@
 
 package org.apache.pulsar.io.kafka;
 
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 
 /**
- * Kafka sink that treats incoming messages on the input topic as Strings
- * and write identical key/value pairs.
+ * Kafka sink should treats incoming messages as pure bytes. So we don't
+ * apply schema into it.
  */
-public class KafkaStringSink extends KafkaAbstractSink<String, String> {
+@Slf4j
+public class KafkaBytesSink extends KafkaAbstractSink<String, byte[]> {
+
+    @Override
+    protected Properties beforeCreateProducer(Properties props) {
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        log.info("Created kafka producer config : {}", props);
+        return props;
+    }
+
     @Override
-    public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
-        return new KeyValue<>(record.getKey().orElse(null), new 
String(record.getValue()));
+    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
+        return new KeyValue<>(record.getKey().orElse(null), record.getValue());
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
index a7bc81355c..7afc154688 100644
--- a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,4 +20,4 @@
 name: kafka
 description: Kafka source and sink connector
 sourceClass: org.apache.pulsar.io.kafka.KafkaStringSource
-sinkClass: org.apache.pulsar.io.kafka.KafkaStringSink
+sinkClass: org.apache.pulsar.io.kafka.KafkaBytesSink
diff --git a/site2/docs/io-quickstart.md b/site2/docs/io-quickstart.md
index 8b8cfd340d..4a48bc09ce 100644
--- a/site2/docs/io-quickstart.md
+++ b/site2/docs/io-quickstart.md
@@ -123,7 +123,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors
 
 Example output:
 ```json
-[{"name":"aerospike","description":"Aerospike database 
sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes
 data into 
Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka
 source and sink 
connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis
 sink 
connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ
 source 
connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest
 data from Twitter 
firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
+[{"name":"aerospike","description":"Aerospike database 
sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes
 data into 
Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka
 source and sink 
connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis
 sink 
connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ
 source 
connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest
 data from Twitter 
firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
 ```
 
 If an error occurred while starting Pulsar service, you may be able to seen 
exception at the terminal you are running `pulsar/standalone`,
diff --git 
a/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md 
b/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md
index 7b21379964..afa8e31a6b 100644
--- a/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md
+++ b/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md
@@ -124,7 +124,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors
 
 Example output:
 ```json
-[{"name":"aerospike","description":"Aerospike database 
sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes
 data into 
Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka
 source and sink 
connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis
 sink 
connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ
 source 
connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest
 data from Twitter 
firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
+[{"name":"aerospike","description":"Aerospike database 
sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes
 data into 
Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka
 source and sink 
connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis
 sink 
connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ
 source 
connector","sourceClass":"org.apache.pulsar.io.rabbitmq.RabbitMQSource"},{"name":"twitter","description":"Ingest
 data from Twitter 
firehose","sourceClass":"org.apache.pulsar.io.twitter.TwitterFireHose"}]
 ```
 
 If an error occurred while starting Pulsar service, you may be able to seen 
exception at the terminal you are running `pulsar/standalone`,
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5f11eef637..e9ea6186c3 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -48,6 +48,7 @@
 import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.containers.GenericContainer;
 import org.testng.annotations.Test;
 
 /**
@@ -62,17 +63,17 @@
 
     @Test
     public void testKafkaSink() throws Exception {
-        testSink(new KafkaSinkTester(), true);
+        testSink(new KafkaSinkTester(), true, new KafkaSourceTester());
     }
 
     @Test
     public void testCassandraSink() throws Exception {
-        testSink(new CassandraSinkTester(), true);
+        testSink(CassandraSinkTester.createTester(true), true);
     }
 
     @Test
     public void testCassandraArchiveSink() throws Exception {
-        testSink(new CassandraSinkArchiveTester(), false);
+        testSink(CassandraSinkTester.createTester(false), false);
     }
     
     @Test(enabled = false)
@@ -91,8 +92,31 @@ public void testElasticSearchSink() throws Exception {
     }
     
     private void testSink(SinkTester tester, boolean builtin) throws Exception 
{
-        tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
+        tester.startServiceContainer(pulsarCluster);
+        try {
+            runSinkTester(tester, builtin);
+        } finally {
+            tester.stopServiceContainer(pulsarCluster);
+        }
+    }
+
 
+    private <ServiceContainerT extends GenericContainer>  void 
testSink(SinkTester<ServiceContainerT> sinkTester,
+                                                                        
boolean builtinSink,
+                                                                        
SourceTester<ServiceContainerT> sourceTester)
+            throws Exception {
+        ServiceContainerT serviceContainer = 
sinkTester.startServiceContainer(pulsarCluster);
+        try {
+            runSinkTester(sinkTester, builtinSink);
+            if (null != sourceTester) {
+                sourceTester.setServiceContainer(serviceContainer);
+                testSource(sourceTester);
+            }
+        } finally {
+            sinkTester.stopServiceContainer(pulsarCluster);
+        }
+    }
+    private void runSinkTester(SinkTester tester, boolean builtin) throws 
Exception {
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String inputTopicName = "test-sink-connector-"
@@ -357,14 +381,7 @@ protected void getSinkInfoNotFound(String tenant, String 
namespace, String sinkN
     // Source Test
     //
 
-    @Test
-    public void testKafkaSource() throws Exception {
-        testSource(new KafkaSourceTester());
-    }
-
     private void testSource(SourceTester tester)  throws Exception {
-        tester.findSourceServiceContainer(pulsarCluster.getExternalServices());
-
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String outputTopicName = "test-source-connector-"
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index 6578d08cf2..6f4d012e24 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -161,7 +161,7 @@ public String generateUpdateFunctionCommand() {
     }
 
     public String generateUpdateFunctionCommand(String codeFile) {
-        StringBuilder commandBuilder = new StringBuilder("PULSAR_MEM=-Xmx1024m 
");
+        StringBuilder commandBuilder = new StringBuilder();
         if (adminUrl == null) {
             commandBuilder.append("/pulsar/bin/pulsar-admin functions update");
         } else {
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
deleted file mode 100644
index 86c76894c5..0000000000
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.tests.integration.containers.CassandraContainer;
-import org.testcontainers.containers.GenericContainer;
-
-import java.util.List;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkState;
-import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * A tester for testing cassandra sink submitted as an archive.
- */
-@Slf4j
-public class CassandraSinkArchiveTester extends SinkTester {
-
-    private static final String NAME = "cassandra";
-
-    private static final String ROOTS = "cassandra";
-    private static final String KEY = "key";
-    private static final String COLUMN = "col";
-
-    private final String keySpace;
-    private final String tableName;
-
-    private CassandraContainer cassandraCluster;
-
-    private Cluster cluster;
-    private Session session;
-
-    public CassandraSinkArchiveTester() {
-        
super("/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", 
"org.apache.pulsar.io.cassandra.CassandraStringSink");
-
-        String suffix = randomName(8) + "_" + System.currentTimeMillis();
-        this.keySpace = "keySpace_" + suffix;
-        this.tableName = "tableName_" + suffix;
-
-        sinkConfig.put("roots", ROOTS);
-        sinkConfig.put("keyspace", keySpace);
-        sinkConfig.put("columnFamily", tableName);
-        sinkConfig.put("keyname", KEY);
-        sinkConfig.put("columnName", COLUMN);
-    }
-
-    @Override
-    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
-        GenericContainer<?> container = containers.get(NAME);
-        checkState(container instanceof CassandraContainer,
-            "No kafka service found in the cluster");
-
-        this.cassandraCluster = (CassandraContainer) container;
-    }
-
-    @Override
-    public void prepareSink() {
-        // build the sink
-        cluster = Cluster.builder()
-            .addContactPoint("localhost")
-            .withPort(cassandraCluster.getCassandraPort())
-            .build();
-
-        // connect to the cluster
-        session = cluster.connect();
-        log.info("Connecting to cassandra cluster at localhost:{}", 
cassandraCluster.getCassandraPort());
-
-        String createKeySpace =
-            "CREATE KEYSPACE " + keySpace
-                + " WITH replication = {'class':'SimpleStrategy', 
'replication_factor':1}; ";
-        log.info(createKeySpace);
-        session.execute(createKeySpace);
-        session.execute("USE " + keySpace);
-
-        String createTable = "CREATE TABLE " + tableName
-            + "(" + KEY + " text PRIMARY KEY, "
-            + COLUMN + " text);";
-        log.info(createTable);
-        session.execute(createTable);
-    }
-
-    @Override
-    public void validateSinkResult(Map<String, String> kvs) {
-        String query = "SELECT * FROM " + tableName + ";";
-        ResultSet result = session.execute(query);
-        List<Row> rows = result.all();
-        assertEquals(kvs.size(), rows.size());
-        for (Row row : rows) {
-            String key = row.getString(KEY);
-            String value = row.getString(COLUMN);
-
-            String expectedValue = kvs.get(key);
-            assertNotNull(expectedValue);
-            assertEquals(expectedValue, value);
-        }
-    }
-}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
index c9d3e5a11f..3309358abc 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -24,12 +24,11 @@
 import com.datastax.driver.core.Session;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.CassandraContainer;
-import org.testcontainers.containers.GenericContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 
 import java.util.List;
 import java.util.Map;
 
-import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -38,7 +37,15 @@
  * A tester for testing cassandra sink.
  */
 @Slf4j
-public class CassandraSinkTester extends SinkTester {
+public class CassandraSinkTester extends SinkTester<CassandraContainer> {
+
+    public static CassandraSinkTester createTester(boolean builtin) {
+        if (builtin) {
+            return new CassandraSinkTester(builtin);
+        } else {
+            return new CassandraSinkTester();
+        }
+    }
 
     private static final String NAME = "cassandra";
 
@@ -49,13 +56,11 @@
     private final String keySpace;
     private final String tableName;
 
-    private CassandraContainer cassandraCluster;
-
     private Cluster cluster;
     private Session session;
 
-    public CassandraSinkTester() {
-        super(SinkType.CASSANDRA);
+    private CassandraSinkTester() {
+        super(NAME, 
"/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", 
"org.apache.pulsar.io.cassandra.CassandraStringSink");
 
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.keySpace = "keySpace_" + suffix;
@@ -68,13 +73,23 @@ public CassandraSinkTester() {
         sinkConfig.put("columnName", COLUMN);
     }
 
-    @Override
-    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
-        GenericContainer<?> container = containers.get(NAME);
-        checkState(container instanceof CassandraContainer,
-            "No kafka service found in the cluster");
+    private CassandraSinkTester(boolean builtin) {
+        super(NAME, SinkType.CASSANDRA);
+
+        String suffix = randomName(8) + "_" + System.currentTimeMillis();
+        this.keySpace = "keySpace_" + suffix;
+        this.tableName = "tableName_" + suffix;
 
-        this.cassandraCluster = (CassandraContainer) container;
+        sinkConfig.put("roots", ROOTS);
+        sinkConfig.put("keyspace", keySpace);
+        sinkConfig.put("columnFamily", tableName);
+        sinkConfig.put("keyname", KEY);
+        sinkConfig.put("columnName", COLUMN);
+    }
+
+    @Override
+    protected CassandraContainer createSinkService(PulsarCluster cluster) {
+        return new CassandraContainer(cluster.getClusterName());
     }
 
     @Override
@@ -82,12 +97,12 @@ public void prepareSink() {
         // build the sink
         cluster = Cluster.builder()
             .addContactPoint("localhost")
-            .withPort(cassandraCluster.getCassandraPort())
+            .withPort(serviceContainer.getCassandraPort())
             .build();
 
         // connect to the cluster
         session = cluster.connect();
-        log.info("Connecting to cassandra cluster at localhost:{}", 
cassandraCluster.getCassandraPort());
+        log.info("Connecting to cassandra cluster at localhost:{}", 
serviceContainer.getCassandraPort());
 
         String createKeySpace =
             "CREATE KEYSPACE " + keySpace
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
index 0effc8eadd..eee208e2e3 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
@@ -18,57 +18,57 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
-import static com.google.common.base.Preconditions.checkState;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import java.util.Map;
 
-import org.apache.http.Header;
 import org.apache.http.HttpHost;
 import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
-import org.testcontainers.containers.GenericContainer;
 
-public class ElasticSearchSinkTester extends SinkTester {
-    
+public class ElasticSearchSinkTester extends 
SinkTester<ElasticSearchContainer> {
+
     private RestHighLevelClient elasticClient;
 
     public ElasticSearchSinkTester() {
-        super(SinkType.ELASTIC_SEARCH);
+        super(ElasticSearchContainer.NAME, SinkType.ELASTIC_SEARCH);
         
-        sinkConfig.put("elasticSearchUrl", "http://localhost:9200";);
+        sinkConfig.put("elasticSearchUrl", "http://"; + 
ElasticSearchContainer.NAME + ":9200");
         sinkConfig.put("indexName", "test-index");
     }
 
+
     @Override
-    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
externalServices) {
-        GenericContainer<?> container = 
externalServices.get(ElasticSearchContainer.NAME);
-        checkState(container instanceof ElasticSearchContainer,
-            "No ElasticSearch service found in the cluster");
+    protected ElasticSearchContainer createSinkService(PulsarCluster cluster) {
+        return new ElasticSearchContainer(cluster.getClusterName());
     }
 
     @Override
     public void prepareSink() throws Exception {
-        RestClientBuilder builder = RestClient.builder(new 
HttpHost("localhost", 9200, "http"));
+        RestClientBuilder builder = RestClient.builder(
+            new HttpHost(
+                "localhost",
+                serviceContainer.getMappedPort(9200),
+                "http"));
         elasticClient = new RestHighLevelClient(builder);
     }
 
     @Override
     public void validateSinkResult(Map<String, String> kvs) {
-        
         SearchRequest searchRequest = new SearchRequest("test-index");
         searchRequest.types("doc");
         
         try {
-            Header headers = null;
-            SearchResponse searchResult = elasticClient.search(searchRequest, 
headers);
-            assertTrue(searchResult.getHits().getTotalHits() > 0);
+            SearchResponse searchResult = elasticClient.search(searchRequest);
+            assertTrue(searchResult.getHits().getTotalHits() > 0, 
searchResult.toString());
         } catch (Exception e) {
-            e.printStackTrace();
+            fail("Encountered exception on validating elastic search results", 
e);
         }
     }
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
index 46c5f242df..957b93acbd 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
@@ -21,18 +21,14 @@
 import java.util.Map;
 
 import org.apache.pulsar.tests.integration.containers.HdfsContainer;
-import org.testcontainers.containers.GenericContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 
-import static com.google.common.base.Preconditions.checkState;
-
-public class HdfsSinkTester extends SinkTester {
+public class HdfsSinkTester extends SinkTester<HdfsContainer> {
        
        private static final String NAME = "HDFS";
        
-       private HdfsContainer hdfsCluster;
-       
        public HdfsSinkTester() {
-               super(SinkType.HDFS);
+               super(NAME, SinkType.HDFS);
                
                // TODO How do I get the core-site.xml, and hdfs-site.xml files 
from the container?
                sinkConfig.put("hdfsConfigResources", "");
@@ -40,20 +36,18 @@ public HdfsSinkTester() {
        }
 
        @Override
-       public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
-               GenericContainer<?> container = containers.get(NAME);   
-               checkState(container instanceof HdfsContainer, "No HDFS service 
found in the cluster");
-           this.hdfsCluster = (HdfsContainer) container;
+       protected HdfsContainer createSinkService(PulsarCluster cluster) {
+               return new HdfsContainer(cluster.getClusterName());
        }
 
        @Override
        public void prepareSink() throws Exception {
                // Create the test directory
-               hdfsCluster.execInContainer("/hadoop/bin/hdfs","dfs", "-mkdir", 
"/tmp/testing");
-               hdfsCluster.execInContainer("/hadoop/bin/hdfs", "-chown", 
"tester:testing", "/tmp/testing");
+               serviceContainer.execInContainer("/hadoop/bin/hdfs","dfs", 
"-mkdir", "/tmp/testing");
+               serviceContainer.execInContainer("/hadoop/bin/hdfs", "-chown", 
"tester:testing", "/tmp/testing");
                
                // Execute all future commands as the "tester" user
-               hdfsCluster.execInContainer("export HADOOP_USER_NAME=tester");
+               serviceContainer.execInContainer("export 
HADOOP_USER_NAME=tester");
        }
 
        @Override
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index 7c14ba96c0..72d9b0152c 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -18,22 +18,23 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
-import static com.google.common.base.Preconditions.checkState;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
+import com.github.dockerjava.api.command.CreateContainerCmd;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.util.Map;
+import java.util.function.Consumer;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.testcontainers.containers.GenericContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.MySQLContainer;
 
 /**
@@ -41,7 +42,7 @@
  * This will use MySql as DB server
  */
 @Slf4j
-public class JdbcSinkTester extends SinkTester {
+public class JdbcSinkTester extends SinkTester<MySQLContainer> {
 
     /**
      * A Simple class to test jdbc class´╝î
@@ -57,14 +58,14 @@
     }
 
     private static final String NAME = "jdbc";
+    private static final String MYSQL = "mysql";
 
-    private MySQLContainer mySQLContainer;
     private AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
     private String tableName = "test";
     private Connection connection;
 
     public JdbcSinkTester() {
-        super(SinkType.JDBC);
+        super(NAME, SinkType.JDBC);
 
         // container default value is test
         sinkConfig.put("userName", "test");
@@ -79,21 +80,28 @@ public JdbcSinkTester() {
     }
 
     @Override
-    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
-        GenericContainer<?> container = containers.get("mysql");
-        checkState(container instanceof MySQLContainer,
-            "No MySQL service found in the cluster");
-
-        this.mySQLContainer = (MySQLContainer) container;
-        log.info("find sink service container: {}", 
mySQLContainer.getContainerName());
+    protected MySQLContainer createSinkService(PulsarCluster cluster) {
+        return (MySQLContainer) new MySQLContainer()
+            .withUsername("test")
+            .withPassword("test")
+            .withDatabaseName("test")
+            .withNetworkAliases(MYSQL)
+            .withCreateContainerCmdModifier(new Consumer<CreateContainerCmd>() 
{
+                @Override
+                public void accept(CreateContainerCmd createContainerCmd) {
+                    createContainerCmd
+                        .withName(MYSQL)
+                        .withHostName(cluster.getClusterName() + "-" + MYSQL);
+                }
+            });
     }
 
     @Override
     public void prepareSink() throws Exception {
-        String jdbcUrl = mySQLContainer.getJdbcUrl();
+        String jdbcUrl = serviceContainer.getJdbcUrl();
         // we need set mysql server address in cluster network.
-        sinkConfig.put("jdbcUrl", "jdbc:mysql://mysql:3306/test");
-        String driver = mySQLContainer.getDriverClassName();
+        sinkConfig.put("jdbcUrl", "jdbc:mysql://" + MYSQL + ":3306/test");
+        String driver = serviceContainer.getDriverClassName();
         Class.forName(driver);
 
         connection = DriverManager.getConnection(jdbcUrl, "test", "test");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
index ff79e1a302..6713cc181f 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
-import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -32,8 +31,8 @@
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.Container.ExecResult;
-import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
@@ -41,18 +40,15 @@
  * A tester for testing kafka sink.
  */
 @Slf4j
-public class KafkaSinkTester extends SinkTester {
+public class KafkaSinkTester extends SinkTester<KafkaContainer> {
 
     private static final String NAME = "kafka";
 
     private final String kafkaTopicName;
-
-    private KafkaContainer kafkaContainer;
-
     private KafkaConsumer<String, String> kafkaConsumer;
 
     public KafkaSinkTester() {
-        super(SinkType.KAFKA);
+        super(NAME, SinkType.KAFKA);
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.kafkaTopicName = "kafka_sink_topic_" + suffix;
 
@@ -64,17 +60,19 @@ public KafkaSinkTester() {
     }
 
     @Override
-    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
-        GenericContainer<?> container = containers.get(NAME);
-        checkState(container instanceof KafkaContainer,
-            "No kafka service found in the cluster");
-
-        this.kafkaContainer = (KafkaContainer) container;
+    protected KafkaContainer createSinkService(PulsarCluster cluster) {
+        final String kafkaServiceName = NAME;
+        return new KafkaContainer()
+                .withEmbeddedZookeeper()
+                .withNetworkAliases(kafkaServiceName)
+                .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
+                    .withName(kafkaServiceName)
+                    .withHostName(cluster.getClusterName() + "-" + 
kafkaServiceName));
     }
 
     @Override
     public void prepareSink() throws Exception {
-        ExecResult execResult = kafkaContainer.execInContainer(
+        ExecResult execResult = serviceContainer.execInContainer(
             "/usr/bin/kafka-topics",
             "--create",
             "--zookeeper",
@@ -91,7 +89,7 @@ public void prepareSink() throws Exception {
 
         kafkaConsumer = new KafkaConsumer<>(
             ImmutableMap.of(
-                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers(),
+                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
serviceContainer.getBootstrapServers(),
                 ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" + randomName(8),
                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
             ),
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
index 4928f00cfb..cee17b1451 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
-import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
 import static org.testng.Assert.assertTrue;
 
@@ -35,7 +34,6 @@
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.testcontainers.containers.Container.ExecResult;
-import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
@@ -43,7 +41,7 @@
  * A tester for testing kafka source.
  */
 @Slf4j
-public class KafkaSourceTester extends SourceTester {
+public class KafkaSourceTester extends SourceTester<KafkaContainer> {
 
     private static final String NAME = "kafka";
 
@@ -68,12 +66,8 @@ public KafkaSourceTester() {
     }
 
     @Override
-    public void findSourceServiceContainer(Map<String, GenericContainer<?>> 
containers) {
-        GenericContainer<?> container = containers.get(NAME);
-        checkState(container instanceof KafkaContainer,
-            "No kafka service found in the cluster");
-
-        this.kafkaContainer = (KafkaContainer) container;
+    public void setServiceContainer(KafkaContainer container) {
+        this.kafkaContainer = container;
     }
 
     @Override
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index d2917e6a3e..2dd0759de2 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -21,6 +21,7 @@
 import java.util.Map;
 import lombok.Getter;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.collections.Maps;
 
@@ -28,7 +29,7 @@
  * A tester used for testing a specific sink.
  */
 @Getter
-public abstract class SinkTester {
+public abstract class SinkTester<ServiceContainerT extends GenericContainer> {
 
     public enum SinkType {
         UNDEFINED,
@@ -39,19 +40,23 @@
         ELASTIC_SEARCH
     }
 
+    protected final String networkAlias;
     protected final SinkType sinkType;
     protected final String sinkArchive;
     protected final String sinkClassName;
     protected final Map<String, Object> sinkConfig;
+    protected ServiceContainerT serviceContainer;
 
-    public SinkTester(SinkType sinkType) {
+    public SinkTester(String networkAlias, SinkType sinkType) {
+        this.networkAlias = networkAlias;
         this.sinkType = sinkType;
         this.sinkArchive = null;
         this.sinkClassName = null;
         this.sinkConfig = Maps.newHashMap();
     }
 
-    public SinkTester(String sinkArchive, String sinkClassName) {
+    public SinkTester(String networkAlias, String sinkArchive, String 
sinkClassName) {
+        this.networkAlias = networkAlias;
         this.sinkType = SinkType.UNDEFINED;
         this.sinkArchive = sinkArchive;
         this.sinkClassName = sinkClassName;
@@ -62,7 +67,19 @@ public SinkTester(String sinkArchive, String sinkClassName) {
         return Schema.STRING;
     }
 
-    public abstract void findSinkServiceContainer(Map<String, 
GenericContainer<?>> externalServices);
+    protected abstract ServiceContainerT createSinkService(PulsarCluster 
cluster);
+
+    public ServiceContainerT startServiceContainer(PulsarCluster cluster) {
+        this.serviceContainer = createSinkService(cluster);
+        cluster.startService(networkAlias, serviceContainer);
+        return serviceContainer;
+    }
+
+    public void stopServiceContainer(PulsarCluster cluster) {
+        if (null != serviceContainer) {
+            cluster.stopService(networkAlias, serviceContainer);
+        }
+    }
 
     public SinkType sinkType() {
         return sinkType;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
index dc58f2ffbb..f1feb70eef 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
@@ -27,7 +27,7 @@
  * A tester used for testing a specific source.
  */
 @Getter
-public abstract class SourceTester {
+public abstract class SourceTester<ServiceContainerT extends GenericContainer> 
{
 
     protected final String sourceType;
     protected final Map<String, Object> sourceConfig;
@@ -37,7 +37,7 @@ protected SourceTester(String sourceType) {
         this.sourceConfig = Maps.newHashMap();
     }
 
-    public abstract void findSourceServiceContainer(Map<String, 
GenericContainer<?>> externalServices);
+    public abstract void setServiceContainer(ServiceContainerT 
serviceContainer);
 
     public String sourceType() {
         return sourceType;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index b4a6b83f58..20b9da0373 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -18,19 +18,10 @@
  */
 package org.apache.pulsar.tests.integration.suites;
 
-import java.util.Map;
-import org.apache.pulsar.tests.integration.containers.CassandraContainer;
-import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
-import org.apache.pulsar.tests.integration.containers.HdfsContainer;
-import 
org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.containers.MySQLContainer;
 import org.testng.ITest;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
-import org.testng.collections.Maps;
 
 public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
 
@@ -46,47 +37,6 @@ public void tearDownCluster() {
         super.tearDownCluster();
     }
 
-    @Override
-    protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, 
PulsarClusterSpecBuilder specBuilder) {
-        PulsarClusterSpecBuilder builder = 
super.beforeSetupCluster(clusterName, specBuilder);
-
-        // start functions
-
-        // register external services
-        Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
-
-        final String kafkaServiceName = "kafka";
-        externalServices.put(
-            kafkaServiceName,
-            new KafkaContainer()
-                .withEmbeddedZookeeper()
-                .withNetworkAliases(kafkaServiceName)
-                .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
-                    .withName(kafkaServiceName)
-                    .withHostName(clusterName + "-" + kafkaServiceName)));
-
-        final String cassandraServiceName = "cassandra";
-        externalServices.put(
-            cassandraServiceName,
-            new CassandraContainer(clusterName));
-
-        // use mySQL for jdbc test
-        final String jdbcServiceName = "mysql";
-        externalServices.put(
-            jdbcServiceName,
-            new MySQLContainer()
-                .withExposedPorts(3306));
-        
-        externalServices.put(
-                ElasticSearchContainer.NAME, 
-                new ElasticSearchContainer(ElasticSearchContainer.NAME)
-                .withExposedPorts(9200));
-
-        builder = builder.externalServices(externalServices);
-
-        return builder;
-    }
-
     @Override
     public String getTestName() {
         return "pulsar-test-suite";
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index f78097b65c..af6071220d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -209,6 +209,23 @@ public void start() throws Exception {
         }
     }
 
+    public void startService(String networkAlias,
+                             GenericContainer<?> serviceContainer) {
+        log.info("Starting external service {} ...", networkAlias);
+        serviceContainer.withNetwork(network);
+        serviceContainer.withNetworkAliases(networkAlias);
+        serviceContainer.start();
+        log.info("Successfully start external service {}", networkAlias);
+    }
+
+    public void stopService(String networkAlias,
+                            GenericContainer<?> serviceContainer) {
+        log.info("Stopping external service {} ...", networkAlias);
+        serviceContainer.stop();
+        log.info("Successfully stop external service {}", networkAlias);
+    }
+
+
     private static <T extends PulsarContainer> Map<String, T> 
runNumContainers(String serviceName,
                                                                                
int numContainers,
                                                                                
Function<String, T> containerCreator) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to