merlimat closed pull request #2167: FunctionActioner should set the type class
name for both source and sink
URL: https://github.com/apache/incubator-pulsar/pull/2167
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/pom.xml b/pom.xml
index 7b59f997e6..2275471c3b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,6 +162,7 @@ flexible messaging model and an intuitive client
API.</description>
<!-- test dependencies -->
<arquillian-cube.version>1.15.1</arquillian-cube.version>
<arquillian-junit.version>1.1.14.Final</arquillian-junit.version>
+ <cassandra.version>3.5.0</cassandra.version>
<disruptor.version>3.4.0</disruptor.version>
<testcontainers.version>1.8.0</testcontainers.version>
@@ -807,6 +808,11 @@ flexible messaging model and an intuitive client
API.</description>
<artifactId>arquillian-junit-standalone</artifactId>
<version>${arquillian-junit.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <version>${cassandra.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 81c40cc9ab..2a38c81f93 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1058,7 +1058,7 @@ public void run() {
});
for (RuntimeSpawner spawner : spawners) {
spawner.join();
- log.info("RuntimeSpawner quit because of {}",
spawner.getRuntime().getDeathException());
+ log.info("RuntimeSpawner quit because of",
spawner.getRuntime().getDeathException());
}
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 95d7a53f47..8054ff8bb0 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -191,7 +191,7 @@
if
(instanceConfig.getFunctionDetails().getSource().getTypeClassName() != null
&&
!instanceConfig.getFunctionDetails().getSource().getTypeClassName().isEmpty()) {
args.add("--source_type_classname");
-
args.add(instanceConfig.getFunctionDetails().getSource().getTypeClassName());
+ args.add("\"" +
instanceConfig.getFunctionDetails().getSource().getTypeClassName() + "\"");
}
}
@@ -222,7 +222,7 @@
if
(instanceConfig.getFunctionDetails().getSink().getTypeClassName() != null
&&
!instanceConfig.getFunctionDetails().getSink().getTypeClassName().isEmpty()) {
args.add("--sink_type_classname");
-
args.add(instanceConfig.getFunctionDetails().getSink().getTypeClassName());
+ args.add("\"" +
instanceConfig.getFunctionDetails().getSink().getTypeClassName() + "\"");
}
}
if (instanceConfig.getFunctionDetails().getSink().getTopic() != null
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index bad505d600..d9cef64829 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -136,12 +136,12 @@ public void testJavaConstructor() {
+ " --pulsar_serviceurl " + pulsarServiceUrl
+ " --max_buffered_tuples 1024 --port " + args.get(35)
+ " --source_classname " +
config.getFunctionDetails().getSource().getClassName()
- + " --source_type_classname " +
config.getFunctionDetails().getSource().getTypeClassName()
+ + " --source_type_classname \"" +
config.getFunctionDetails().getSource().getTypeClassName() + "\""
+ " --source_subscription_type " +
config.getFunctionDetails().getSource().getSubscriptionType().name()
+ " --source_topics_serde_classname " + new
Gson().toJson(topicsToSerDeClassName)
+ " --topics_pattern " +
config.getFunctionDetails().getSource().getTopicsPattern()
+ " --sink_classname " +
config.getFunctionDetails().getSink().getClassName()
- + " --sink_type_classname " +
config.getFunctionDetails().getSink().getTypeClassName()
+ + " --sink_type_classname \"" +
config.getFunctionDetails().getSink().getTypeClassName() + "\""
+ " --sink_topic " +
config.getFunctionDetails().getSink().getTopic()
+ " --sink_serde_classname " +
config.getFunctionDetails().getSink().getSerDeClassName()
+ " --state_storage_serviceurl " + stateStorageServiceUrl;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 8ad292110d..09273606ab 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -163,6 +163,9 @@ protected void startFunction(FunctionRuntimeInfo
functionRuntimeInfo) throws Exc
instanceConfig.setInstanceId(String.valueOf(instanceId));
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
+
+ log.info("start process with instance config {}", instanceConfig);
+
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig,
pkgFile.getAbsolutePath(),
runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
@@ -317,6 +320,13 @@ private void fillSourceTypeClass(FunctionDetails.Builder
functionDetails, File a
SourceSpec.Builder sourceBuilder =
SourceSpec.newBuilder(functionDetails.getSource());
sourceBuilder.setTypeClassName(typeArg);
functionDetails.setSource(sourceBuilder);
+
+ SinkSpec sinkSpec = functionDetails.getSink();
+ if (null == sinkSpec ||
StringUtils.isEmpty(sinkSpec.getTypeClassName())) {
+ SinkSpec.Builder sinkBuilder = SinkSpec.newBuilder(sinkSpec);
+ sinkBuilder.setTypeClassName(typeArg);
+ functionDetails.setSink(sinkBuilder);
+ }
}
}
@@ -328,6 +338,13 @@ private void fillSinkTypeClass(FunctionDetails.Builder
functionDetails, File arc
SinkSpec.Builder sinkBuilder =
SinkSpec.newBuilder(functionDetails.getSink());
sinkBuilder.setTypeClassName(typeArg);
functionDetails.setSink(sinkBuilder);
+
+ SourceSpec sourceSpec = functionDetails.getSource();
+ if (null == sourceSpec ||
StringUtils.isEmpty(sourceSpec.getTypeClassName())) {
+ SourceSpec.Builder sourceBuilder =
SourceSpec.newBuilder(sourceSpec);
+ sourceBuilder.setTypeClassName(typeArg);
+ functionDetails.setSource(sourceBuilder);
+ }
}
}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
index ebfa000764..b0d244b6d7 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/containers/CassandraContainer.java
@@ -45,4 +45,8 @@ protected void configure() {
})
.waitingFor(new HostPortWaitStrategy());
}
+
+ public int getCassandraPort() {
+ return getMappedPort(PORT);
+ }
}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index 6983a1c00c..040d59dad1 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -178,7 +178,9 @@ public void start() throws Exception {
Map<String, GenericContainer<?>> externalServices =
spec.externalServices;
if (null != externalServices) {
externalServices.entrySet().forEach(service -> {
- service.getValue().start();
+ GenericContainer<?> serviceContainer = service.getValue();
+ serviceContainer.withNetwork(network);
+ serviceContainer.start();
log.info("Successfully start external service {}.",
service.getKey());
});
}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
index 24e91842e5..b88dee768b 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -65,13 +65,20 @@ public void setupCluster() throws Exception {
}
public void setupCluster(String namePrefix) throws Exception {
- PulsarClusterSpec spec = PulsarClusterSpec.builder()
- .clusterName(Stream.of(this.getClass().getSimpleName(),
namePrefix, randomName(5))
- .filter(s -> s != null && !s.isEmpty())
- .collect(joining("-")))
- .build();
+ String clusterName = Stream.of(this.getClass().getSimpleName(),
namePrefix, randomName(5))
+ .filter(s -> s != null && !s.isEmpty())
+ .collect(joining("-"));
- setupCluster(spec);
+ PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder =
PulsarClusterSpec.builder()
+ .clusterName(clusterName);
+
+ setupCluster(beforeSetupCluster(clusterName, specBuilder).build());
+ }
+
+ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+ String clusterName,
+ PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+ return specBuilder;
}
protected void setupCluster(PulsarClusterSpec spec) throws Exception {
@@ -93,7 +100,7 @@ public void tearDownCluster() {
protected static String randomName(int numChars) {
StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 8; i++) {
+ for (int i = 0; i < numChars; i++) {
sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
}
return sb.toString();
diff --git a/tests/integration/semantics/pom.xml
b/tests/integration/semantics/pom.xml
index 63c16219ed..b98237ada4 100644
--- a/tests/integration/semantics/pom.xml
+++ b/tests/integration/semantics/pom.xml
@@ -53,5 +53,10 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 8d224043e1..9da03afe64 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -18,17 +18,11 @@
*/
package org.apache.pulsar.tests.integration.functions;
-import static java.util.stream.Collectors.joining;
-
-import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.pulsar.tests.integration.functions.runtime.PulsarFunctionsRuntimeTest;
import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.Container.ExecResult;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
/**
@@ -37,6 +31,14 @@
@Slf4j
public abstract class PulsarFunctionsTestBase extends PulsarClusterTestBase {
+ @DataProvider(name = "FunctionRuntimeTypes")
+ public static Object[][] getData() {
+ return new Object[][] {
+ { FunctionRuntimeType.PROCESS },
+ { FunctionRuntimeType.THREAD }
+ };
+ }
+
protected final FunctionRuntimeType functionRuntimeType;
public PulsarFunctionsTestBase() {
@@ -47,18 +49,12 @@ protected PulsarFunctionsTestBase(FunctionRuntimeType
functionRuntimeType) {
this.functionRuntimeType = functionRuntimeType;
}
- @BeforeClass
- @Override
- public void setupCluster() throws Exception {
- PulsarClusterSpec spec = PulsarClusterSpec.builder()
- .clusterName(Stream.of(this.getClass().getSimpleName(),
randomName(5))
- .filter(s -> s != null && !s.isEmpty())
- .collect(joining("-")))
+ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+ String clusterName,
+ PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+ return super.beforeSetupCluster(clusterName, specBuilder)
.functionRuntimeType(functionRuntimeType)
- .numFunctionWorkers(2)
- .build();
-
- super.setupCluster(spec);
+ .numFunctionWorkers(2);
}
//
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
index 6f498218ba..b2c85eb3ed 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
@@ -43,14 +43,6 @@
*/
public class PulsarFunctionsRuntimeTest extends PulsarFunctionsTestBase {
- @DataProvider(name = "FunctionRuntimeTypes")
- public static Object[][] getData() {
- return new Object[][] {
- { FunctionRuntimeType.PROCESS },
- { FunctionRuntimeType.THREAD }
- };
- }
-
@Factory(dataProvider = "FunctionRuntimeTypes")
PulsarFunctionsRuntimeTest(FunctionRuntimeType functionRuntimeType) {
super(functionRuntimeType);
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
new file mode 100644
index 0000000000..483b564d40
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.containers.CassandraContainer;
+import org.apache.pulsar.tests.integration.utils.TestUtils;
+
+/**
+ * A tester for testing cassandra sink.
+ */
+@Slf4j
+public class CassandraSinkTester extends SinkTester<CassandraContainer> {
+
+ private static final String ROOTS = "cassandra";
+ private static final String KEY = "key";
+ private static final String COLUMN = "col";
+
+ private final String keySpace;
+ private final String tableName;
+
+ private CassandraContainer cassandraCluster;
+
+ private Cluster cluster;
+ private Session session;
+
+ public CassandraSinkTester() {
+ super("cassandra");
+
+ String suffix = TestUtils.randomName(8) + "_" +
System.currentTimeMillis();
+ this.keySpace = "keySpace_" + suffix;
+ this.tableName = "tableName_" + suffix;
+
+ sinkConfig.put("roots", ROOTS);
+ sinkConfig.put("keyspace", keySpace);
+ sinkConfig.put("columnFamily", tableName);
+ sinkConfig.put("keyname", KEY);
+ sinkConfig.put("columnName", COLUMN);
+ }
+
+ @Override
+ protected CassandraContainer newSinkService(String clusterName) {
+ this.cassandraCluster = new CassandraContainer(clusterName);
+ return this.cassandraCluster;
+ }
+
+ @Override
+ protected void prepareSink() {
+ // build the sink
+ cluster = Cluster.builder()
+ .addContactPoint("localhost")
+ .withPort(cassandraCluster.getCassandraPort())
+ .build();
+
+ // connect to the cluster
+ session = cluster.connect();
+ log.info("Connecting to cassandra cluster at localhost:{}",
cassandraCluster.getCassandraPort());
+
+ String createKeySpace =
+ "CREATE KEYSPACE " + keySpace
+ + " WITH replication = {'class':'SimpleStrategy',
'replication_factor':1}; ";
+ log.info(createKeySpace);
+ session.execute(createKeySpace);
+ session.execute("USE " + keySpace);
+
+ String createTable = "CREATE TABLE " + tableName
+ + "(" + KEY + " text PRIMARY KEY, "
+ + COLUMN + " text);";
+ log.info(createTable);
+ session.execute(createTable);
+ }
+
+ @Override
+ protected void validateSinkResult(Map<String, String> kvs) {
+ String query = "SELECT * FROM " + tableName + ";";
+ ResultSet result = session.execute(query);
+ List<Row> rows = result.all();
+ assertEquals(kvs.size(), rows.size());
+ for (Row row : rows) {
+ String key = row.getString(KEY);
+ String value = row.getString(COLUMN);
+
+ String expectedValue = kvs.get(key);
+ assertNotNull(expectedValue);
+ assertEquals(expectedValue, value);
+ }
+ }
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
new file mode 100644
index 0000000000..52719bff70
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOSinkTest.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.base.Stopwatch;
+import com.google.gson.Gson;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
+import org.apache.pulsar.tests.topologies.FunctionRuntimeType;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+import
org.apache.pulsar.tests.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+/**
+ * A test base for testing sink.
+ */
+@Slf4j
+public class PulsarIOSinkTest extends PulsarFunctionsTestBase {
+
+ @DataProvider(name = "Sinks")
+ public static Object[][] getData() {
+ return new Object[][] {
+ { FunctionRuntimeType.PROCESS, new CassandraSinkTester() },
+ { FunctionRuntimeType.THREAD, new CassandraSinkTester() }
+ };
+ }
+
+ protected final SinkTester tester;
+
+ @Factory(dataProvider = "Sinks")
+ PulsarIOSinkTest(FunctionRuntimeType functionRuntimeType, SinkTester
tester) {
+ super(functionRuntimeType);
+ this.tester = tester;
+ }
+
+ @Override
+ protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
+
PulsarClusterSpecBuilder specBuilder) {
+ Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+ externalServices.put(tester.sinkType,
tester.newSinkService(clusterName));
+ return super.beforeSetupCluster(clusterName, specBuilder)
+ .externalServices(externalServices);
+ }
+
+ @Test
+ public void testSink() throws Exception {
+ final String tenant = TopicName.PUBLIC_TENANT;
+ final String namespace = TopicName.DEFAULT_NAMESPACE;
+ final String inputTopicName = "test-sink-connector-input-topic-" +
randomName(8);
+ final String sinkName = "test-sink-connector-name-" + randomName(8);
+ final int numMessages = 20;
+
+ // prepare the testing environment for sink
+ prepareSink();
+
+ // submit the sink connector
+ submitSinkConnector(tenant, namespace, sinkName, inputTopicName);
+
+ // get sink info
+ getSinkInfoSuccess(tenant, namespace, sinkName);
+
+ // get sink status
+ getSinkStatus(tenant, namespace, sinkName);
+
+ // produce messages
+ Map<String, String> kvs = produceMessagesToInputTopic(inputTopicName,
numMessages);
+
+ // wait for sink to process messages
+ waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
+
+ // validate the sink result
+ tester.validateSinkResult(kvs);
+
+ // delete the sink
+ deleteSink(tenant, namespace, sinkName);
+
+ // get sink info (sink should be deleted)
+ getSinkInfoNotFound(tenant, namespace, sinkName);
+ }
+
+ protected void prepareSink() {
+ tester.prepareSink();
+ }
+
+ protected void submitSinkConnector(String tenant,
+ String namespace,
+ String sinkName,
+ String inputTopicName) throws Exception
{
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sink", "create",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName,
+ "--sink-type", tester.sinkType(),
+ "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+ "--inputs", inputTopicName
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Created successfully\""),
+ result.getStdout());
+ }
+
+ protected void getSinkInfoSuccess(String tenant, String namespace, String
sinkName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get sink info : {}", result.getStdout());
+ assertTrue(
+ result.getStdout().contains("\"builtin\": \"cassandra\""),
+ result.getStdout()
+ );
+ }
+
+ protected void getSinkStatus(String tenant, String namespace, String
sinkName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ while (true) {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get sink status : {}", result.getStdout());
+ if (result.getStdout().contains("\"running\": true")) {
+ return;
+ }
+ log.info("Backoff 1 second until the function is running");
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ protected Map<String, String> produceMessagesToInputTopic(String
inputTopicName,
+ int numMessages)
throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(inputTopicName)
+ .create();
+ Map<String, String> kvs = Maps.newHashMap();
+ for (int i = 0; i < numMessages; i++) {
+ String key = "key-" + i;
+ String value = "value-" + i;
+ kvs.put(key, value);
+ producer.newMessage()
+ .key(key)
+ .value(value)
+ .send();
+ }
+ return kvs;
+ }
+
+ protected void waitForProcessingMessages(String tenant,
+ String namespace,
+ String sinkName,
+ int numMessages) throws Exception
{
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ while (true) {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ log.info("Get sink status : {}", result.getStdout());
+ if (result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\"")) {
+ return;
+ }
+ log.info("{} ms has elapsed but the sink hasn't process {}
messages, backoff to wait for another 1 second",
+ stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ protected void deleteSink(String tenant, String namespace, String
sinkName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sink",
+ "delete",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("Deleted successfully"),
+ result.getStdout()
+ );
+ assertTrue(
+ result.getStderr().isEmpty(),
+ result.getStderr()
+ );
+ }
+
+ protected void getSinkInfoNotFound(String tenant, String namespace, String
sinkName) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+ assertTrue(result.getStderr().contains("Reason: Function " + sinkName
+ " doesn't exist"));
+ }
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
new file mode 100644
index 0000000000..dbc5884881
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.collections.Maps;
+
+/**
+ * A tester used for testing a specific sink.
+ */
+public abstract class SinkTester<SINK_SERVICE_CONTAINER extends
GenericContainer> {
+
+ protected final String sinkType;
+ protected final Map<String, Object> sinkConfig;
+
+ protected SinkTester(String sinkType) {
+ this.sinkType = sinkType;
+ this.sinkConfig = Maps.newHashMap();
+ }
+
+ protected abstract SINK_SERVICE_CONTAINER newSinkService(String
clusterName);
+
+ protected String sinkType() {
+ return sinkType;
+ }
+
+ protected Map<String, Object> sinkConfig() {
+ return sinkConfig;
+ }
+
+ protected abstract void prepareSink();
+
+ protected abstract void validateSinkResult(Map<String, String> kvs);
+
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/utils/TestUtils.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/utils/TestUtils.java
new file mode 100644
index 0000000000..726e9f208f
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/utils/TestUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public final class TestUtils {
+
+ private TestUtils() {}
+
+ public static String randomName(int numChars) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < numChars; i++) {
+ sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
+ }
+ return sb.toString();
+ }
+
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services