This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 555dd57   FunctionActioner should set the type class name for both 
source and sink (#2167)
555dd57 is described below

commit 555dd57a424956adf1d5a3e46b927eed8660a1e9
Author: Sijie Guo <[email protected]>
AuthorDate: Sun Jul 15 22:36:42 2018 -0700

     FunctionActioner should set the type class name for both source and sink 
(#2167)
    
    * FunctionActioner should set the type class name for both source and sink
    
    *Motivation*
    
    The type class names are not set correctly for builtin connectors by 
FunctionActioner.
    
    *Changes*
    
    Set the type class name inferred from NAR package in FunctionActioner.
    
    * Add an integration for connectors
    
    * Fix ProcessRuntimeTest
---
 pom.xml                                            |   6 +
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |   2 +-
 .../pulsar/functions/runtime/ProcessRuntime.java   |   4 +-
 .../functions/runtime/ProcessRuntimeTest.java      |   4 +-
 .../pulsar/functions/worker/FunctionActioner.java  |  17 ++
 .../tests/containers/CassandraContainer.java       |   4 +
 .../pulsar/tests/topologies/PulsarCluster.java     |   4 +-
 .../tests/topologies/PulsarClusterTestBase.java    |  21 +-
 tests/integration/semantics/pom.xml                |   5 +
 .../functions/PulsarFunctionsTestBase.java         |  30 ++-
 .../runtime/PulsarFunctionsRuntimeTest.java        |   8 -
 .../tests/integration/io/CassandraSinkTester.java  | 113 +++++++++
 .../tests/integration/io/PulsarIOSinkTest.java     | 254 +++++++++++++++++++++
 .../pulsar/tests/integration/io/SinkTester.java    |  52 +++++
 .../pulsar/tests/integration/utils/TestUtils.java  |  35 +++
 15 files changed, 521 insertions(+), 38 deletions(-)

diff --git a/pom.xml b/pom.xml
index 7b59f99..2275471 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,6 +162,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <!-- test dependencies -->
     <arquillian-cube.version>1.15.1</arquillian-cube.version>
     <arquillian-junit.version>1.1.14.Final</arquillian-junit.version>
+    <cassandra.version>3.5.0</cassandra.version>
     <disruptor.version>3.4.0</disruptor.version>
     <testcontainers.version>1.8.0</testcontainers.version>
 
@@ -807,6 +808,11 @@ flexible messaging model and an intuitive client 
API.</description>
         <artifactId>arquillian-junit-standalone</artifactId>
         <version>${arquillian-junit.version}</version>
       </dependency>
+      <dependency>
+        <groupId>com.datastax.cassandra</groupId>
+        <artifactId>cassandra-driver-core</artifactId>
+        <version>${cassandra.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 81c40cc..2a38c81 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1058,7 +1058,7 @@ public class CmdFunctions extends CmdBase {
                 });
             for (RuntimeSpawner spawner : spawners) {
                 spawner.join();
-                log.info("RuntimeSpawner quit because of {}", 
spawner.getRuntime().getDeathException());
+                log.info("RuntimeSpawner quit because of", 
spawner.getRuntime().getDeathException());
             }
 
         }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 95d7a53..8054ff8 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -191,7 +191,7 @@ class ProcessRuntime implements Runtime {
             if 
(instanceConfig.getFunctionDetails().getSource().getTypeClassName() != null
                 && 
!instanceConfig.getFunctionDetails().getSource().getTypeClassName().isEmpty()) {
                 args.add("--source_type_classname");
-                
args.add(instanceConfig.getFunctionDetails().getSource().getTypeClassName());
+                args.add("\"" + 
instanceConfig.getFunctionDetails().getSource().getTypeClassName() + "\"");
             }
         }
 
@@ -222,7 +222,7 @@ class ProcessRuntime implements Runtime {
             if 
(instanceConfig.getFunctionDetails().getSink().getTypeClassName() != null
                     && 
!instanceConfig.getFunctionDetails().getSink().getTypeClassName().isEmpty()) {
                 args.add("--sink_type_classname");
-                
args.add(instanceConfig.getFunctionDetails().getSink().getTypeClassName());
+                args.add("\"" + 
instanceConfig.getFunctionDetails().getSink().getTypeClassName() + "\"");
             }
         }
         if (instanceConfig.getFunctionDetails().getSink().getTopic() != null
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index bad505d..d9cef64 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -136,12 +136,12 @@ public class ProcessRuntimeTest {
                 + " --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(35)
                 + " --source_classname " + 
config.getFunctionDetails().getSource().getClassName()
-                + " --source_type_classname " + 
config.getFunctionDetails().getSource().getTypeClassName()
+                + " --source_type_classname \"" + 
config.getFunctionDetails().getSource().getTypeClassName() + "\""
                 + " --source_subscription_type " + 
config.getFunctionDetails().getSource().getSubscriptionType().name()
                 + " --source_topics_serde_classname " + new 
Gson().toJson(topicsToSerDeClassName)
                 + " --topics_pattern " + 
config.getFunctionDetails().getSource().getTopicsPattern()
                 + " --sink_classname " + 
config.getFunctionDetails().getSink().getClassName()
-                + " --sink_type_classname " + 
config.getFunctionDetails().getSink().getTypeClassName()
+                + " --sink_type_classname \"" + 
config.getFunctionDetails().getSink().getTypeClassName() + "\""
                 + " --sink_topic " + 
config.getFunctionDetails().getSink().getTopic()
                 + " --sink_serde_classname " + 
config.getFunctionDetails().getSink().getSerDeClassName()
                 + " --state_storage_serviceurl " + stateStorageServiceUrl;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 8ad2921..0927360 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -163,6 +163,9 @@ public class FunctionActioner implements AutoCloseable {
         instanceConfig.setInstanceId(String.valueOf(instanceId));
         instanceConfig.setMaxBufferedTuples(1024);
         
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
+
+        log.info("start process with instance config {}", instanceConfig);
+
         RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, 
pkgFile.getAbsolutePath(),
                 runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
 
@@ -317,6 +320,13 @@ public class FunctionActioner implements AutoCloseable {
             SourceSpec.Builder sourceBuilder = 
SourceSpec.newBuilder(functionDetails.getSource());
             sourceBuilder.setTypeClassName(typeArg);
             functionDetails.setSource(sourceBuilder);
+
+            SinkSpec sinkSpec = functionDetails.getSink();
+            if (null == sinkSpec || 
StringUtils.isEmpty(sinkSpec.getTypeClassName())) {
+                SinkSpec.Builder sinkBuilder = SinkSpec.newBuilder(sinkSpec);
+                sinkBuilder.setTypeClassName(typeArg);
+                functionDetails.setSink(sinkBuilder);
+            }
         }
     }
 
@@ -328,6 +338,13 @@ public class FunctionActioner implements AutoCloseable {
             SinkSpec.Builder sinkBuilder = 
SinkSpec.newBuilder(functionDetails.getSink());
             sinkBuilder.setTypeClassName(typeArg);
             functionDetails.setSink(sinkBuilder);
+
+            SourceSpec sourceSpec = functionDetails.getSource();
+            if (null == sourceSpec || 
StringUtils.isEmpty(sourceSpec.getTypeClassName())) {
+                SourceSpec.Builder sourceBuilder = 
SourceSpec.newBuilder(sourceSpec);
+                sourceBuilder.setTypeClassName(typeArg);
+                functionDetails.setSource(sourceBuilder);
+            }
         }
     }
 
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
index ebfa000..b0d244b 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
@@ -45,4 +45,8 @@ public class CassandraContainer<SelfT extends 
ChaosContainer<SelfT>> extends Cha
             })
             .waitingFor(new HostPortWaitStrategy());
     }
+
+    public int getCassandraPort() {
+        return getMappedPort(PORT);
+    }
 }
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 6983a1c..040d59d 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -178,7 +178,9 @@ public class PulsarCluster {
         Map<String, GenericContainer<?>> externalServices = 
spec.externalServices;
         if (null != externalServices) {
             externalServices.entrySet().forEach(service -> {
-                service.getValue().start();
+                GenericContainer<?> serviceContainer = service.getValue();
+                serviceContainer.withNetwork(network);
+                serviceContainer.start();
                 log.info("Successfully start external service {}.", 
service.getKey());
             });
         }
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
index 24e9184..b88dee7 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -65,13 +65,20 @@ public abstract class PulsarClusterTestBase {
     }
 
     public void setupCluster(String namePrefix) throws Exception {
-        PulsarClusterSpec spec = PulsarClusterSpec.builder()
-            .clusterName(Stream.of(this.getClass().getSimpleName(), 
namePrefix, randomName(5))
-                .filter(s -> s != null && !s.isEmpty())
-                .collect(joining("-")))
-            .build();
+        String clusterName = Stream.of(this.getClass().getSimpleName(), 
namePrefix, randomName(5))
+            .filter(s -> s != null && !s.isEmpty())
+            .collect(joining("-"));
 
-        setupCluster(spec);
+        PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder = 
PulsarClusterSpec.builder()
+            .clusterName(clusterName);
+
+        setupCluster(beforeSetupCluster(clusterName, specBuilder).build());
+    }
+
+    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+        String clusterName,
+        PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+        return specBuilder;
     }
 
     protected void setupCluster(PulsarClusterSpec spec) throws Exception {
@@ -93,7 +100,7 @@ public abstract class PulsarClusterTestBase {
 
     protected static String randomName(int numChars) {
         StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < 8; i++) {
+        for (int i = 0; i < numChars; i++) {
             sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
         }
         return sb.toString();
diff --git a/tests/integration/semantics/pom.xml 
b/tests/integration/semantics/pom.xml
index 63c1621..b98237a 100644
--- a/tests/integration/semantics/pom.xml
+++ b/tests/integration/semantics/pom.xml
@@ -53,5 +53,10 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.datastax.cassandra</groupId>
+      <artifactId>cassandra-driver-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 8d22404..9da03af 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -18,17 +18,11 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
-import static java.util.stream.Collectors.joining;
-
-import java.util.stream.Stream;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.pulsar.tests.integration.functions.runtime.PulsarFunctionsRuntimeTest;
 import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
 import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.topologies.PulsarClusterSpec;
 import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.Container.ExecResult;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 
 /**
@@ -37,6 +31,14 @@ import org.testng.annotations.DataProvider;
 @Slf4j
 public abstract class PulsarFunctionsTestBase extends PulsarClusterTestBase  {
 
+    @DataProvider(name = "FunctionRuntimeTypes")
+    public static Object[][] getData() {
+        return new Object[][] {
+            { FunctionRuntimeType.PROCESS },
+            { FunctionRuntimeType.THREAD }
+        };
+    }
+
     protected final FunctionRuntimeType functionRuntimeType;
 
     public PulsarFunctionsTestBase() {
@@ -47,18 +49,12 @@ public abstract class PulsarFunctionsTestBase extends 
PulsarClusterTestBase  {
         this.functionRuntimeType = functionRuntimeType;
     }
 
-    @BeforeClass
-    @Override
-    public void setupCluster() throws Exception {
-        PulsarClusterSpec spec = PulsarClusterSpec.builder()
-            .clusterName(Stream.of(this.getClass().getSimpleName(), 
randomName(5))
-                .filter(s -> s != null && !s.isEmpty())
-                .collect(joining("-")))
+    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+        String clusterName,
+        PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+        return super.beforeSetupCluster(clusterName, specBuilder)
             .functionRuntimeType(functionRuntimeType)
-            .numFunctionWorkers(2)
-            .build();
-
-        super.setupCluster(spec);
+            .numFunctionWorkers(2);
     }
 
     //
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
index 6f49821..b2c85eb 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
@@ -43,14 +43,6 @@ import org.testng.annotations.Test;
  */
 public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase {
 
-    @DataProvider(name = "FunctionRuntimeTypes")
-    public static Object[][] getData() {
-        return new Object[][] {
-            { FunctionRuntimeType.PROCESS },
-            { FunctionRuntimeType.THREAD }
-        };
-    }
-
     @Factory(dataProvider = "FunctionRuntimeTypes")
     PulsarFunctionsRuntimeTest(FunctionRuntimeType functionRuntimeType) {
         super(functionRuntimeType);
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
new file mode 100644
index 0000000..483b564
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.containers.CassandraContainer;
+import org.apache.pulsar.tests.integration.utils.TestUtils;
+
+/**
+ * A tester for testing cassandra sink.
+ */
+@Slf4j
+public class CassandraSinkTester extends SinkTester<CassandraContainer> {
+
+    private static final String ROOTS = "cassandra";
+    private static final String KEY = "key";
+    private static final String COLUMN = "col";
+
+    private final String keySpace;
+    private final String tableName;
+
+    private CassandraContainer cassandraCluster;
+
+    private Cluster cluster;
+    private Session session;
+
+    public CassandraSinkTester() {
+        super("cassandra");
+
+        String suffix = TestUtils.randomName(8) + "_" + 
System.currentTimeMillis();
+        this.keySpace = "keySpace_" + suffix;
+        this.tableName = "tableName_" + suffix;
+
+        sinkConfig.put("roots", ROOTS);
+        sinkConfig.put("keyspace", keySpace);
+        sinkConfig.put("columnFamily", tableName);
+        sinkConfig.put("keyname", KEY);
+        sinkConfig.put("columnName", COLUMN);
+    }
+
+    @Override
+    protected CassandraContainer newSinkService(String clusterName) {
+        this.cassandraCluster = new CassandraContainer(clusterName);
+        return this.cassandraCluster;
+    }
+
+    @Override
+    protected void prepareSink() {
+        // build the sink
+        cluster = Cluster.builder()
+            .addContactPoint("localhost")
+            .withPort(cassandraCluster.getCassandraPort())
+            .build();
+
+        // connect to the cluster
+        session = cluster.connect();
+        log.info("Connecting to cassandra cluster at localhost:{}", 
cassandraCluster.getCassandraPort());
+
+        String createKeySpace =
+            "CREATE KEYSPACE " + keySpace
+                + " WITH replication = {'class':'SimpleStrategy', 
'replication_factor':1}; ";
+        log.info(createKeySpace);
+        session.execute(createKeySpace);
+        session.execute("USE " + keySpace);
+
+        String createTable = "CREATE TABLE " + tableName
+            + "(" + KEY + " text PRIMARY KEY, "
+            + COLUMN + " text);";
+        log.info(createTable);
+        session.execute(createTable);
+    }
+
+    @Override
+    protected void validateSinkResult(Map<String, String> kvs) {
+        String query = "SELECT * FROM " + tableName + ";";
+        ResultSet result = session.execute(query);
+        List<Row> rows = result.all();
+        assertEquals(kvs.size(), rows.size());
+        for (Row row : rows) {
+            String key = row.getString(KEY);
+            String value = row.getString(COLUMN);
+
+            String expectedValue = kvs.get(key);
+            assertNotNull(expectedValue);
+            assertEquals(expectedValue, value);
+        }
+    }
+}
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
new file mode 100644
index 0000000..52719bf
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.base.Stopwatch;
+import com.google.gson.Gson;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
+import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import 
org.apache.pulsar.tests.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+/**
+ * A test base for testing sink.
+ */
+@Slf4j
+public class PulsarIOSinkTest extends PulsarFunctionsTestBase {
+
+    @DataProvider(name = "Sinks")
+    public static Object[][] getData() {
+        return new Object[][] {
+            { FunctionRuntimeType.PROCESS,  new CassandraSinkTester() },
+            { FunctionRuntimeType.THREAD, new CassandraSinkTester() }
+        };
+    }
+
+    protected final SinkTester tester;
+
+    @Factory(dataProvider = "Sinks")
+    PulsarIOSinkTest(FunctionRuntimeType functionRuntimeType, SinkTester 
tester) {
+        super(functionRuntimeType);
+        this.tester = tester;
+    }
+
+    @Override
+    protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
+                                                          
PulsarClusterSpecBuilder specBuilder) {
+        Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+        externalServices.put(tester.sinkType, 
tester.newSinkService(clusterName));
+        return super.beforeSetupCluster(clusterName, specBuilder)
+            .externalServices(externalServices);
+    }
+
+    @Test
+    public void testSink() throws Exception {
+        final String tenant = TopicName.PUBLIC_TENANT;
+        final String namespace = TopicName.DEFAULT_NAMESPACE;
+        final String inputTopicName = "test-sink-connector-input-topic-" + 
randomName(8);
+        final String sinkName = "test-sink-connector-name-" + randomName(8);
+        final int numMessages = 20;
+
+        // prepare the testing environment for sink
+        prepareSink();
+
+        // submit the sink connector
+        submitSinkConnector(tenant, namespace, sinkName, inputTopicName);
+
+        // get sink info
+        getSinkInfoSuccess(tenant, namespace, sinkName);
+
+        // get sink status
+        getSinkStatus(tenant, namespace, sinkName);
+
+        // produce messages
+        Map<String, String> kvs = produceMessagesToInputTopic(inputTopicName, 
numMessages);
+
+        // wait for sink to process messages
+        waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
+
+        // validate the sink result
+        tester.validateSinkResult(kvs);
+
+        // delete the sink
+        deleteSink(tenant, namespace, sinkName);
+
+        // get sink info (sink should be deleted)
+        getSinkInfoNotFound(tenant, namespace, sinkName);
+    }
+
+    protected void prepareSink() {
+        tester.prepareSink();
+    }
+
+    protected void submitSinkConnector(String tenant,
+                                       String namespace,
+                                       String sinkName,
+                                       String inputTopicName) throws Exception 
{
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "sink", "create",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName,
+            "--sink-type", tester.sinkType(),
+            "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+            "--inputs", inputTopicName
+        };
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(
+            result.getStdout().contains("\"Created successfully\""),
+            result.getStdout());
+    }
+
+    protected void getSinkInfoSuccess(String tenant, String namespace, String 
sinkName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName
+        };
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        log.info("Get sink info : {}", result.getStdout());
+        assertTrue(
+            result.getStdout().contains("\"builtin\": \"cassandra\""),
+            result.getStdout()
+        );
+    }
+
+    protected void getSinkStatus(String tenant, String namespace, String 
sinkName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "getstatus",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName
+        };
+        while (true) {
+            ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+            log.info("Get sink status : {}", result.getStdout());
+            if (result.getStdout().contains("\"running\": true")) {
+                return;
+            }
+            log.info("Backoff 1 second until the function is running");
+            TimeUnit.SECONDS.sleep(1);
+        }
+    }
+
+    protected Map<String, String> produceMessagesToInputTopic(String 
inputTopicName,
+                                                              int numMessages) 
throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic(inputTopicName)
+            .create();
+        Map<String, String> kvs = Maps.newHashMap();
+        for (int i = 0; i < numMessages; i++) {
+            String key = "key-" + i;
+            String value = "value-" + i;
+            kvs.put(key, value);
+            producer.newMessage()
+                .key(key)
+                .value(value)
+                .send();
+        }
+        return kvs;
+    }
+
+    protected void waitForProcessingMessages(String tenant,
+                                             String namespace,
+                                             String sinkName,
+                                             int numMessages) throws Exception 
{
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "getstatus",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName
+        };
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        while (true) {
+            ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+            log.info("Get sink status : {}", result.getStdout());
+            if (result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\"")) {
+                return;
+            }
+            log.info("{} ms has elapsed but the sink hasn't process {} 
messages, backoff to wait for another 1 second",
+                stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+            TimeUnit.SECONDS.sleep(1);
+        }
+    }
+
+    protected void deleteSink(String tenant, String namespace, String 
sinkName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "sink",
+            "delete",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName
+        };
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(
+            result.getStdout().contains("Deleted successfully"),
+            result.getStdout()
+        );
+        assertTrue(
+            result.getStderr().isEmpty(),
+            result.getStderr()
+        );
+    }
+
+    protected void getSinkInfoNotFound(String tenant, String namespace, String 
sinkName) throws Exception {
+        String[] commands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", tenant,
+            "--namespace", namespace,
+            "--name", sinkName
+        };
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+        assertTrue(result.getStderr().contains("Reason: Function " + sinkName 
+ " doesn't exist"));
+    }
+}
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
new file mode 100644
index 0000000..dbc5884
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.collections.Maps;
+
+/**
+ * A tester used for testing a specific sink.
+ */
+public abstract class SinkTester<SINK_SERVICE_CONTAINER extends 
GenericContainer> {
+
+    protected final String sinkType;
+    protected final Map<String, Object> sinkConfig;
+
+    protected SinkTester(String sinkType) {
+        this.sinkType = sinkType;
+        this.sinkConfig = Maps.newHashMap();
+    }
+
+    protected abstract SINK_SERVICE_CONTAINER newSinkService(String 
clusterName);
+
+    protected String sinkType() {
+        return sinkType;
+    }
+
+    protected Map<String, Object> sinkConfig() {
+        return sinkConfig;
+    }
+
+    protected abstract void prepareSink();
+
+    protected abstract void validateSinkResult(Map<String, String> kvs);
+
+}
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/utils/TestUtils.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/utils/TestUtils.java
new file mode 100644
index 0000000..726e9f2
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/utils/TestUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public final class TestUtils {
+
+    private TestUtils() {}
+
+    public static String randomName(int numChars) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < numChars; i++) {
+            sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
+        }
+        return sb.toString();
+    }
+
+}

Reply via email to