This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 555dd57 FunctionActioner should set the type class name for both
source and sink (#2167)
555dd57 is described below
commit 555dd57a424956adf1d5a3e46b927eed8660a1e9
Author: Sijie Guo <[email protected]>
AuthorDate: Sun Jul 15 22:36:42 2018 -0700
FunctionActioner should set the type class name for both source and sink
(#2167)
* FunctionActioner should set the type class name for both source and sink
*Motivation*
The type class names are not set correctly for builtin connectors by
FunctionActioner.
*Changes*
Set the type class name inferred from NAR package in FunctionActioner.
* Add an integration for connectors
* Fix ProcessRuntimeTest
---
pom.xml | 6 +
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 2 +-
.../pulsar/functions/runtime/ProcessRuntime.java | 4 +-
.../functions/runtime/ProcessRuntimeTest.java | 4 +-
.../pulsar/functions/worker/FunctionActioner.java | 17 ++
.../tests/containers/CassandraContainer.java | 4 +
.../pulsar/tests/topologies/PulsarCluster.java | 4 +-
.../tests/topologies/PulsarClusterTestBase.java | 21 +-
tests/integration/semantics/pom.xml | 5 +
.../functions/PulsarFunctionsTestBase.java | 30 ++-
.../runtime/PulsarFunctionsRuntimeTest.java | 8 -
.../tests/integration/io/CassandraSinkTester.java | 113 +++++++++
.../tests/integration/io/PulsarIOSinkTest.java | 254 +++++++++++++++++++++
.../pulsar/tests/integration/io/SinkTester.java | 52 +++++
.../pulsar/tests/integration/utils/TestUtils.java | 35 +++
15 files changed, 521 insertions(+), 38 deletions(-)
diff --git a/pom.xml b/pom.xml
index 7b59f99..2275471 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,6 +162,7 @@ flexible messaging model and an intuitive client
API.</description>
<!-- test dependencies -->
<arquillian-cube.version>1.15.1</arquillian-cube.version>
<arquillian-junit.version>1.1.14.Final</arquillian-junit.version>
+ <cassandra.version>3.5.0</cassandra.version>
<disruptor.version>3.4.0</disruptor.version>
<testcontainers.version>1.8.0</testcontainers.version>
@@ -807,6 +808,11 @@ flexible messaging model and an intuitive client
API.</description>
<artifactId>arquillian-junit-standalone</artifactId>
<version>${arquillian-junit.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <version>${cassandra.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 81c40cc..2a38c81 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1058,7 +1058,7 @@ public class CmdFunctions extends CmdBase {
});
for (RuntimeSpawner spawner : spawners) {
spawner.join();
- log.info("RuntimeSpawner quit because of {}",
spawner.getRuntime().getDeathException());
+ log.info("RuntimeSpawner quit because of",
spawner.getRuntime().getDeathException());
}
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 95d7a53..8054ff8 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -191,7 +191,7 @@ class ProcessRuntime implements Runtime {
if
(instanceConfig.getFunctionDetails().getSource().getTypeClassName() != null
&&
!instanceConfig.getFunctionDetails().getSource().getTypeClassName().isEmpty()) {
args.add("--source_type_classname");
-
args.add(instanceConfig.getFunctionDetails().getSource().getTypeClassName());
+ args.add("\"" +
instanceConfig.getFunctionDetails().getSource().getTypeClassName() + "\"");
}
}
@@ -222,7 +222,7 @@ class ProcessRuntime implements Runtime {
if
(instanceConfig.getFunctionDetails().getSink().getTypeClassName() != null
&&
!instanceConfig.getFunctionDetails().getSink().getTypeClassName().isEmpty()) {
args.add("--sink_type_classname");
-
args.add(instanceConfig.getFunctionDetails().getSink().getTypeClassName());
+ args.add("\"" +
instanceConfig.getFunctionDetails().getSink().getTypeClassName() + "\"");
}
}
if (instanceConfig.getFunctionDetails().getSink().getTopic() != null
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index bad505d..d9cef64 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -136,12 +136,12 @@ public class ProcessRuntimeTest {
+ " --pulsar_serviceurl " + pulsarServiceUrl
+ " --max_buffered_tuples 1024 --port " + args.get(35)
+ " --source_classname " +
config.getFunctionDetails().getSource().getClassName()
- + " --source_type_classname " +
config.getFunctionDetails().getSource().getTypeClassName()
+ + " --source_type_classname \"" +
config.getFunctionDetails().getSource().getTypeClassName() + "\""
+ " --source_subscription_type " +
config.getFunctionDetails().getSource().getSubscriptionType().name()
+ " --source_topics_serde_classname " + new
Gson().toJson(topicsToSerDeClassName)
+ " --topics_pattern " +
config.getFunctionDetails().getSource().getTopicsPattern()
+ " --sink_classname " +
config.getFunctionDetails().getSink().getClassName()
- + " --sink_type_classname " +
config.getFunctionDetails().getSink().getTypeClassName()
+ + " --sink_type_classname \"" +
config.getFunctionDetails().getSink().getTypeClassName() + "\""
+ " --sink_topic " +
config.getFunctionDetails().getSink().getTopic()
+ " --sink_serde_classname " +
config.getFunctionDetails().getSink().getSerDeClassName()
+ " --state_storage_serviceurl " + stateStorageServiceUrl;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 8ad2921..0927360 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -163,6 +163,9 @@ public class FunctionActioner implements AutoCloseable {
instanceConfig.setInstanceId(String.valueOf(instanceId));
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
+
+ log.info("start process with instance config {}", instanceConfig);
+
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig,
pkgFile.getAbsolutePath(),
runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
@@ -317,6 +320,13 @@ public class FunctionActioner implements AutoCloseable {
SourceSpec.Builder sourceBuilder =
SourceSpec.newBuilder(functionDetails.getSource());
sourceBuilder.setTypeClassName(typeArg);
functionDetails.setSource(sourceBuilder);
+
+ SinkSpec sinkSpec = functionDetails.getSink();
+ if (null == sinkSpec ||
StringUtils.isEmpty(sinkSpec.getTypeClassName())) {
+ SinkSpec.Builder sinkBuilder = SinkSpec.newBuilder(sinkSpec);
+ sinkBuilder.setTypeClassName(typeArg);
+ functionDetails.setSink(sinkBuilder);
+ }
}
}
@@ -328,6 +338,13 @@ public class FunctionActioner implements AutoCloseable {
SinkSpec.Builder sinkBuilder =
SinkSpec.newBuilder(functionDetails.getSink());
sinkBuilder.setTypeClassName(typeArg);
functionDetails.setSink(sinkBuilder);
+
+ SourceSpec sourceSpec = functionDetails.getSource();
+ if (null == sourceSpec ||
StringUtils.isEmpty(sourceSpec.getTypeClassName())) {
+ SourceSpec.Builder sourceBuilder =
SourceSpec.newBuilder(sourceSpec);
+ sourceBuilder.setTypeClassName(typeArg);
+ functionDetails.setSource(sourceBuilder);
+ }
}
}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
index ebfa000..b0d244b 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
@@ -45,4 +45,8 @@ public class CassandraContainer<SelfT extends
ChaosContainer<SelfT>> extends Cha
})
.waitingFor(new HostPortWaitStrategy());
}
+
+ public int getCassandraPort() {
+ return getMappedPort(PORT);
+ }
}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 6983a1c..040d59d 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -178,7 +178,9 @@ public class PulsarCluster {
Map<String, GenericContainer<?>> externalServices =
spec.externalServices;
if (null != externalServices) {
externalServices.entrySet().forEach(service -> {
- service.getValue().start();
+ GenericContainer<?> serviceContainer = service.getValue();
+ serviceContainer.withNetwork(network);
+ serviceContainer.start();
log.info("Successfully start external service {}.",
service.getKey());
});
}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
index 24e9184..b88dee7 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -65,13 +65,20 @@ public abstract class PulsarClusterTestBase {
}
public void setupCluster(String namePrefix) throws Exception {
- PulsarClusterSpec spec = PulsarClusterSpec.builder()
- .clusterName(Stream.of(this.getClass().getSimpleName(),
namePrefix, randomName(5))
- .filter(s -> s != null && !s.isEmpty())
- .collect(joining("-")))
- .build();
+ String clusterName = Stream.of(this.getClass().getSimpleName(),
namePrefix, randomName(5))
+ .filter(s -> s != null && !s.isEmpty())
+ .collect(joining("-"));
- setupCluster(spec);
+ PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder =
PulsarClusterSpec.builder()
+ .clusterName(clusterName);
+
+ setupCluster(beforeSetupCluster(clusterName, specBuilder).build());
+ }
+
+ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+ String clusterName,
+ PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+ return specBuilder;
}
protected void setupCluster(PulsarClusterSpec spec) throws Exception {
@@ -93,7 +100,7 @@ public abstract class PulsarClusterTestBase {
protected static String randomName(int numChars) {
StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 8; i++) {
+ for (int i = 0; i < numChars; i++) {
sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
}
return sb.toString();
diff --git a/tests/integration/semantics/pom.xml
b/tests/integration/semantics/pom.xml
index 63c1621..b98237a 100644
--- a/tests/integration/semantics/pom.xml
+++ b/tests/integration/semantics/pom.xml
@@ -53,5 +53,10 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 8d22404..9da03af 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -18,17 +18,11 @@
*/
package org.apache.pulsar.tests.integration.functions;
-import static java.util.stream.Collectors.joining;
-
-import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.pulsar.tests.integration.functions.runtime.PulsarFunctionsRuntimeTest;
import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.Container.ExecResult;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
/**
@@ -37,6 +31,14 @@ import org.testng.annotations.DataProvider;
@Slf4j
public abstract class PulsarFunctionsTestBase extends PulsarClusterTestBase {
+ @DataProvider(name = "FunctionRuntimeTypes")
+ public static Object[][] getData() {
+ return new Object[][] {
+ { FunctionRuntimeType.PROCESS },
+ { FunctionRuntimeType.THREAD }
+ };
+ }
+
protected final FunctionRuntimeType functionRuntimeType;
public PulsarFunctionsTestBase() {
@@ -47,18 +49,12 @@ public abstract class PulsarFunctionsTestBase extends
PulsarClusterTestBase {
this.functionRuntimeType = functionRuntimeType;
}
- @BeforeClass
- @Override
- public void setupCluster() throws Exception {
- PulsarClusterSpec spec = PulsarClusterSpec.builder()
- .clusterName(Stream.of(this.getClass().getSimpleName(),
randomName(5))
- .filter(s -> s != null && !s.isEmpty())
- .collect(joining("-")))
+ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+ String clusterName,
+ PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+ return super.beforeSetupCluster(clusterName, specBuilder)
.functionRuntimeType(functionRuntimeType)
- .numFunctionWorkers(2)
- .build();
-
- super.setupCluster(spec);
+ .numFunctionWorkers(2);
}
//
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
index 6f49821..b2c85eb 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
@@ -43,14 +43,6 @@ import org.testng.annotations.Test;
*/
public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase {
- @DataProvider(name = "FunctionRuntimeTypes")
- public static Object[][] getData() {
- return new Object[][] {
- { FunctionRuntimeType.PROCESS },
- { FunctionRuntimeType.THREAD }
- };
- }
-
@Factory(dataProvider = "FunctionRuntimeTypes")
PulsarFunctionsRuntimeTest(FunctionRuntimeType functionRuntimeType) {
super(functionRuntimeType);
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
new file mode 100644
index 0000000..483b564
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.containers.CassandraContainer;
+import org.apache.pulsar.tests.integration.utils.TestUtils;
+
+/**
+ * A tester for testing cassandra sink.
+ */
+@Slf4j
+public class CassandraSinkTester extends SinkTester<CassandraContainer> {
+
+ private static final String ROOTS = "cassandra";
+ private static final String KEY = "key";
+ private static final String COLUMN = "col";
+
+ private final String keySpace;
+ private final String tableName;
+
+ private CassandraContainer cassandraCluster;
+
+ private Cluster cluster;
+ private Session session;
+
+ public CassandraSinkTester() {
+ super("cassandra");
+
+ String suffix = TestUtils.randomName(8) + "_" +
System.currentTimeMillis();
+ this.keySpace = "keySpace_" + suffix;
+ this.tableName = "tableName_" + suffix;
+
+ sinkConfig.put("roots", ROOTS);
+ sinkConfig.put("keyspace", keySpace);
+ sinkConfig.put("columnFamily", tableName);
+ sinkConfig.put("keyname", KEY);
+ sinkConfig.put("columnName", COLUMN);
+ }
+
+ @Override
+ protected CassandraContainer newSinkService(String clusterName) {
+ this.cassandraCluster = new CassandraContainer(clusterName);
+ return this.cassandraCluster;
+ }
+
+ @Override
+ protected void prepareSink() {
+ // build the sink
+ cluster = Cluster.builder()
+ .addContactPoint("localhost")
+ .withPort(cassandraCluster.getCassandraPort())
+ .build();
+
+ // connect to the cluster
+ session = cluster.connect();
+ log.info("Connecting to cassandra cluster at localhost:{}",
cassandraCluster.getCassandraPort());
+
+ String createKeySpace =
+ "CREATE KEYSPACE " + keySpace
+ + " WITH replication = {'class':'SimpleStrategy',
'replication_factor':1}; ";
+ log.info(createKeySpace);
+ session.execute(createKeySpace);
+ session.execute("USE " + keySpace);
+
+ String createTable = "CREATE TABLE " + tableName
+ + "(" + KEY + " text PRIMARY KEY, "
+ + COLUMN + " text);";
+ log.info(createTable);
+ session.execute(createTable);
+ }
+
+ @Override
+ protected void validateSinkResult(Map<String, String> kvs) {
+ String query = "SELECT * FROM " + tableName + ";";
+ ResultSet result = session.execute(query);
+ List<Row> rows = result.all();
+ assertEquals(kvs.size(), rows.size());
+ for (Row row : rows) {
+ String key = row.getString(KEY);
+ String value = row.getString(COLUMN);
+
+ String expectedValue = kvs.get(key);
+ assertNotNull(expectedValue);
+ assertEquals(expectedValue, value);
+ }
+ }
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
new file mode 100644
index 0000000..52719bf
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.base.Stopwatch;
+import com.google.gson.Gson;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
+import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import
org.apache.pulsar.tests.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+/**
+ * A test base for testing sink.
+ */
+@Slf4j
+public class PulsarIOSinkTest extends PulsarFunctionsTestBase {
+
+ @DataProvider(name = "Sinks")
+ public static Object[][] getData() {
+ return new Object[][] {
+ { FunctionRuntimeType.PROCESS, new CassandraSinkTester() },
+ { FunctionRuntimeType.THREAD, new CassandraSinkTester() }
+ };
+ }
+
+ protected final SinkTester tester;
+
+ @Factory(dataProvider = "Sinks")
+ PulsarIOSinkTest(FunctionRuntimeType functionRuntimeType, SinkTester
tester) {
+ super(functionRuntimeType);
+ this.tester = tester;
+ }
+
+ @Override
+ protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
+
PulsarClusterSpecBuilder specBuilder) {
+ Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+ externalServices.put(tester.sinkType,
tester.newSinkService(clusterName));
+ return super.beforeSetupCluster(clusterName, specBuilder)
+ .externalServices(externalServices);
+ }
+
+ @Test
+ public void testSink() throws Exception {
+ final String tenant = TopicName.PUBLIC_TENANT;
+ final String namespace = TopicName.DEFAULT_NAMESPACE;
+ final String inputTopicName = "test-sink-connector-input-topic-" +
randomName(8);
+ final String sinkName = "test-sink-connector-name-" + randomName(8);
+ final int numMessages = 20;
+
+ // prepare the testing environment for sink
+ prepareSink();
+
+ // submit the sink connector
+ submitSinkConnector(tenant, namespace, sinkName, inputTopicName);
+
+ // get sink info
+ getSinkInfoSuccess(tenant, namespace, sinkName);
+
+ // get sink status
+ getSinkStatus(tenant, namespace, sinkName);
+
+ // produce messages
+ Map<String, String> kvs = produceMessagesToInputTopic(inputTopicName,
numMessages);
+
+ // wait for sink to process messages
+ waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
+
+ // validate the sink result
+ tester.validateSinkResult(kvs);
+
+ // delete the sink
+ deleteSink(tenant, namespace, sinkName);
+
+ // get sink info (sink should be deleted)
+ getSinkInfoNotFound(tenant, namespace, sinkName);
+ }
+
+ protected void prepareSink() {
+ tester.prepareSink();
+ }
+
+ protected void submitSinkConnector(String tenant,
+ String namespace,
+ String sinkName,
+ String inputTopicName) throws Exception
{
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sink", "create",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName,
+ "--sink-type", tester.sinkType(),
+ "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+ "--inputs", inputTopicName
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Created successfully\""),
+ result.getStdout());
+ }
+
+ protected void getSinkInfoSuccess(String tenant, String namespace, String
sinkName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get sink info : {}", result.getStdout());
+ assertTrue(
+ result.getStdout().contains("\"builtin\": \"cassandra\""),
+ result.getStdout()
+ );
+ }
+
+ protected void getSinkStatus(String tenant, String namespace, String
sinkName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ while (true) {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get sink status : {}", result.getStdout());
+ if (result.getStdout().contains("\"running\": true")) {
+ return;
+ }
+ log.info("Backoff 1 second until the function is running");
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ protected Map<String, String> produceMessagesToInputTopic(String
inputTopicName,
+ int numMessages)
throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(inputTopicName)
+ .create();
+ Map<String, String> kvs = Maps.newHashMap();
+ for (int i = 0; i < numMessages; i++) {
+ String key = "key-" + i;
+ String value = "value-" + i;
+ kvs.put(key, value);
+ producer.newMessage()
+ .key(key)
+ .value(value)
+ .send();
+ }
+ return kvs;
+ }
+
+ protected void waitForProcessingMessages(String tenant,
+ String namespace,
+ String sinkName,
+ int numMessages) throws Exception
{
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ while (true) {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get sink status : {}", result.getStdout());
+ if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
+ return;
+ }
+ log.info("{} ms has elapsed but the sink hasn't process {}
messages, backoff to wait for another 1 second",
+ stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ protected void deleteSink(String tenant, String namespace, String
sinkName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sink",
+ "delete",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("Deleted successfully"),
+ result.getStdout()
+ );
+ assertTrue(
+ result.getStderr().isEmpty(),
+ result.getStderr()
+ );
+ }
+
+ protected void getSinkInfoNotFound(String tenant, String namespace, String
sinkName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(result.getStderr().contains("Reason: Function " + sinkName
+ " doesn't exist"));
+ }
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
new file mode 100644
index 0000000..dbc5884
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.collections.Maps;
+
+/**
+ * A tester used for testing a specific sink.
+ */
+public abstract class SinkTester<SINK_SERVICE_CONTAINER extends
GenericContainer> {
+
+ protected final String sinkType;
+ protected final Map<String, Object> sinkConfig;
+
+ protected SinkTester(String sinkType) {
+ this.sinkType = sinkType;
+ this.sinkConfig = Maps.newHashMap();
+ }
+
+ protected abstract SINK_SERVICE_CONTAINER newSinkService(String
clusterName);
+
+ protected String sinkType() {
+ return sinkType;
+ }
+
+ protected Map<String, Object> sinkConfig() {
+ return sinkConfig;
+ }
+
+ protected abstract void prepareSink();
+
+ protected abstract void validateSinkResult(Map<String, String> kvs);
+
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/utils/TestUtils.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/utils/TestUtils.java
new file mode 100644
index 0000000..726e9f2
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/utils/TestUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public final class TestUtils {
+
+ private TestUtils() {}
+
+ public static String randomName(int numChars) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < numChars; i++) {
+ sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
+ }
+ return sb.toString();
+ }
+
+}