This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 791b596 Add integration tests on pulsar functions on process mode
(#2063)
791b596 is described below
commit 791b5963d08fa002659706e28e73abdf8f80aa56
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Jul 2 10:22:17 2018 -0700
Add integration tests on pulsar functions on process mode (#2063)
---
tests/integration/semantics/pom.xml | 4 +
.../integration/functions/PulsarFunctionsTest.java | 138 ++++++++++-
.../functions/PulsarFunctionsTestBase.java | 12 +
.../functions/utils/CommandGenerator.java | 260 +++++++++++++++++++++
4 files changed, 412 insertions(+), 2 deletions(-)
diff --git a/tests/integration/semantics/pom.xml
b/tests/integration/semantics/pom.xml
index 5e8eeca..63c1621 100644
--- a/tests/integration/semantics/pom.xml
+++ b/tests/integration/semantics/pom.xml
@@ -38,6 +38,10 @@
<dependencies>
<dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${project.version}</version>
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 07d86f7..a35d466 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,12 +18,20 @@
*/
package org.apache.pulsar.tests.integration.functions;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-import com.google.common.io.Files;
-import java.io.File;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.tests.containers.WorkerContainer;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
+import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
import org.apache.pulsar.tests.topologies.PulsarCluster;
import org.testcontainers.containers.Container.ExecResult;
@@ -32,6 +40,10 @@ import org.testng.annotations.Test;
@Slf4j
public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
+ //
+ // Tests on uploading/downloading function packages.
+ //
+
@Test
public String checkUpload() throws Exception {
String bkPkgPath = String.format("%s/%s/%s",
@@ -82,4 +94,126 @@ public class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
assertTrue(output.getStderr().isEmpty());
}
+ //
+ // Test CRUD functions on different runtimes.
+ //
+
+ @Test(dataProvider = "FunctionRuntimes")
+ public void testExclamationFunction(Runtime runtime) throws Exception {
+ String inputTopicName = "test-exclamation-" + runtime + "-input-" +
randomName(8);
+ String outputTopicName = "test-exclamation-" + runtime + "-output-" +
randomName(8);
+ String functionName = "test-exclamation-fn-" + randomName(8);
+ final int numMessages = 10;
+
+ // submit the exclamation function
+ submitExclamationFunction(
+ inputTopicName, outputTopicName, functionName);
+
+ // get function info
+ getFunctionInfoSuccess(functionName);
+
+ // publish and consume result
+ publishAndConsumeMessages(inputTopicName, outputTopicName,
numMessages);
+
+ // get function status
+ getFunctionStatus(functionName, numMessages);
+
+ // delete function
+ deleteFunction(functionName);
+
+ // get function info
+ getFunctionInfoNotFound(functionName);
+ }
+
+ private static void submitExclamationFunction(String inputTopicName,
+ String outputTopicName,
+ String functionName) throws
Exception {
+ CommandGenerator generator =
CommandGenerator.createDefaultGenerator(inputTopicName, EXCLAMATION_FUNC_CLASS);
+ generator.setSinkTopic(outputTopicName);
+ generator.setFunctionName(functionName);
+ String command = generator.generateCreateFunctionCommand();
+ String[] commands = {
+ "sh", "-c", command
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ commands);
+ assertTrue(result.getStdout().contains("\"Created successfully\""));
+ }
+
+ private static void getFunctionInfoSuccess(String functionName) throws
Exception {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+ assertTrue(result.getStdout().contains("\"name\": \"" + functionName +
"\""));
+ }
+
+ private static void getFunctionInfoNotFound(String functionName) throws
Exception {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+ assertTrue(result.getStderr().contains("Reason: Function " +
functionName + " doesn't exist"));
+ }
+
+ private static void getFunctionStatus(String functionName, int
numMessages) throws Exception {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+ assertTrue(result.getStdout().contains("\"running\": true"));
+ assertTrue(result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\""));
+ assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\":
\"" + numMessages + "\""));
+ }
+
+ private static void publishAndConsumeMessages(String inputTopic,
+ String outputTopic,
+ int numMessages) throws
Exception {
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(outputTopic)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub")
+ .subscribe();
+ @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(inputTopic)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send("message-" + i);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<String> msg = consumer.receive();
+ assertEquals("message-" + i + "!", msg.getValue());
+ }
+ }
+
+ private static void deleteFunction(String functionName) throws Exception {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "delete",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+ assertTrue(result.getStdout().contains("Deleted successfully"));
+ assertTrue(result.getStderr().isEmpty());
+ }
+
}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 8e692ad..c0acd1d 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -18,14 +18,19 @@
*/
package org.apache.pulsar.tests.integration.functions;
+import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
/**
* A cluster to run pulsar functions for testing functions related features.
*/
public class PulsarFunctionsTestBase extends PulsarClusterTestBase {
+ public static final String EXCLAMATION_FUNC_CLASS =
+ "org.apache.pulsar.functions.api.examples.ExclamationFunction";
+
@BeforeClass
public static void setupCluster() throws Exception {
PulsarClusterTestBase.setupCluster();
@@ -33,4 +38,11 @@ public class PulsarFunctionsTestBase extends
PulsarClusterTestBase {
pulsarCluster.startFunctionWorkers(1);
}
+ @DataProvider(name = "FunctionRuntimes")
+ public static Object[][] functionRuntimes() {
+ return new Object[][] {
+ new Object[] { Runtime.JAVA }
+ };
+ }
+
}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
new file mode 100644
index 0000000..12225fe
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions.utils;
+
+import com.google.gson.Gson;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+
+@Getter
+@Setter
+@ToString
+public class CommandGenerator {
+ public enum Runtime {
+ JAVA,
+ PYTHON,
+ };
+ private String functionName;
+ private String tenant = "public";
+ private String namespace = "default";
+ private String functionClassName;
+ private String sourceTopic;
+ private Map<String, String> customSereSourceTopics;
+ private String sinkTopic;
+ private String logTopic;
+ private String outputSerDe;
+ private String processingGuarantees;
+ private Runtime runtime;
+ private Integer parallelism;
+ private String adminUrl;
+ private Integer windowLengthCount;
+ private Long windowLengthDurationMs;
+ private Integer slidingIntervalCount;
+ private Long slidingIntervalDurationMs;
+
+ private Map<String, String> userConfig = new HashMap<>();
+ private static final String JAVAJAR = "/pulsar/examples/api-examples.jar";
+ private static final String PYTHONBASE = "/pulsar/examples/python";
+
+ public static CommandGenerator createDefaultGenerator(String sourceTopic,
String functionClassName) {
+ CommandGenerator generator = new CommandGenerator();
+ generator.setSourceTopic(sourceTopic);
+ generator.setFunctionClassName(functionClassName);
+ generator.setRuntime(Runtime.JAVA);
+ return generator;
+ }
+
+ public static CommandGenerator createDefaultGenerator(Map<String, String>
customSereSourceTopics,
+ String
functionClassName) {
+ CommandGenerator generator = new CommandGenerator();
+ generator.setCustomSereSourceTopics(customSereSourceTopics);
+ generator.setFunctionClassName(functionClassName);
+ generator.setRuntime(Runtime.JAVA);
+ return generator;
+ }
+
+ public static CommandGenerator createDefaultGenerator(String tenant,
String namespace, String functionName) {
+ CommandGenerator generator = new CommandGenerator();
+ generator.setTenant(tenant);
+ generator.setNamespace(namespace);
+ generator.setFunctionName(functionName);
+ generator.setRuntime(Runtime.JAVA);
+ return generator;
+ }
+
+ public void createAdminUrl(String workerHost, int port) {
+ adminUrl = "http://" + workerHost + ":" + port;
+ }
+
+ public String generateCreateFunctionCommand() {
+ return generateCreateFunctionCommand(null);
+ }
+
+ public String generateCreateFunctionCommand(String codeFile) {
+ StringBuilder commandBuilder = new
StringBuilder(PulsarCluster.ADMIN_SCRIPT);
+ if (adminUrl != null) {
+ commandBuilder.append(" --admin-url ");
+ commandBuilder.append(adminUrl);
+ }
+ commandBuilder.append(" functions create");
+ if (tenant != null) {
+ commandBuilder.append(" --tenant " + tenant);
+ }
+ if (namespace != null) {
+ commandBuilder.append(" --namespace " + namespace);
+ }
+ if (functionName != null) {
+ commandBuilder.append(" --name " + functionName);
+ }
+ commandBuilder.append(" --className " + functionClassName);
+ if (sourceTopic != null) {
+ commandBuilder.append(" --inputs " + sourceTopic);
+ }
+ if (logTopic != null) {
+ commandBuilder.append(" --logTopic " + logTopic);
+ }
+ if (customSereSourceTopics != null &&
!customSereSourceTopics.isEmpty()) {
+ commandBuilder.append(" --customSerdeInputs \'" + new
Gson().toJson(customSereSourceTopics) + "\'");
+ }
+ if (sinkTopic != null) {
+ commandBuilder.append(" --output " + sinkTopic);
+ }
+ if (outputSerDe != null) {
+ commandBuilder.append(" --outputSerdeClassName " + outputSerDe);
+ }
+ if (processingGuarantees != null) {
+ commandBuilder.append(" --processingGuarantees " +
processingGuarantees);
+ }
+ if (!userConfig.isEmpty()) {
+ commandBuilder.append(" --userConfig \'" + new
Gson().toJson(userConfig) + "\'");
+ }
+ if (parallelism != null) {
+ commandBuilder.append(" --parallelism " + parallelism);
+ }
+ if (windowLengthCount != null) {
+ commandBuilder.append(" --windowLengthCount " + windowLengthCount);
+ }
+ if (windowLengthDurationMs != null) {
+ commandBuilder.append(" --windowLengthDurationMs " +
windowLengthDurationMs);
+ }
+ if (slidingIntervalCount != null) {
+ commandBuilder.append( " --slidingIntervalCount " +
slidingIntervalCount);
+ }
+ if (slidingIntervalDurationMs != null) {
+ commandBuilder.append(" --slidingIntervalDurationMs " +
slidingIntervalDurationMs);
+ }
+
+ if (runtime == Runtime.JAVA) {
+ commandBuilder.append(" --jar " + JAVAJAR);
+ } else {
+ if (codeFile != null) {
+ commandBuilder.append(" --py " + PYTHONBASE + codeFile);
+ } else {
+ commandBuilder.append(" --py " + PYTHONBASE);
+ }
+ }
+ return commandBuilder.toString();
+ }
+
+ public String generateUpdateFunctionCommand() {
+ return generateUpdateFunctionCommand(null);
+ }
+
+ public String generateUpdateFunctionCommand(String codeFile) {
+ StringBuilder commandBuilder = new StringBuilder("PULSAR_MEM=-Xmx1024m
");
+ if (adminUrl == null) {
+ commandBuilder.append("/pulsar/bin/pulsar-admin functions update");
+ } else {
+ commandBuilder.append("/pulsar/bin/pulsar-admin");
+ commandBuilder.append(" --admin-url ");
+ commandBuilder.append(adminUrl);
+ commandBuilder.append(" functions update");
+ }
+ if (tenant != null) {
+ commandBuilder.append(" --tenant " + tenant);
+ }
+ if (namespace != null) {
+ commandBuilder.append(" --namespace " + namespace);
+ }
+ if (functionName != null) {
+ commandBuilder.append(" --name " + functionName);
+ }
+ commandBuilder.append(" --className " + functionClassName);
+ if (sourceTopic != null) {
+ commandBuilder.append(" --inputs " + sourceTopic);
+ }
+ if (customSereSourceTopics != null &&
!customSereSourceTopics.isEmpty()) {
+ commandBuilder.append(" --customSerdeInputs \'" + new
Gson().toJson(customSereSourceTopics) + "\'");
+ }
+ if (sinkTopic != null) {
+ commandBuilder.append(" --output " + sinkTopic);
+ }
+ if (logTopic != null) {
+ commandBuilder.append(" --logTopic " + logTopic);
+ }
+ if (outputSerDe != null) {
+ commandBuilder.append(" --outputSerdeClassName " + outputSerDe);
+ }
+ if (processingGuarantees != null) {
+ commandBuilder.append(" --processingGuarantees " +
processingGuarantees);
+ }
+ if (!userConfig.isEmpty()) {
+ commandBuilder.append(" --userConfig \'" + new
Gson().toJson(userConfig) + "\'");
+ }
+ if (parallelism != null) {
+ commandBuilder.append(" --parallelism " + parallelism);
+ }
+ if (windowLengthCount != null) {
+ commandBuilder.append(" --windowLengthCount " + windowLengthCount);
+ }
+ if (windowLengthDurationMs != null) {
+ commandBuilder.append(" --windowLengthDurationMs " +
windowLengthDurationMs);
+ }
+ if (slidingIntervalCount != null) {
+ commandBuilder.append(" --slidingIntervalCount " +
slidingIntervalCount);
+ }
+ if (slidingIntervalDurationMs != null) {
+ commandBuilder.append(" --slidingIntervalDurationMs " +
slidingIntervalDurationMs);
+ }
+
+ if (runtime == Runtime.JAVA) {
+ commandBuilder.append(" --jar " + JAVAJAR);
+ } else {
+ if (codeFile != null) {
+ commandBuilder.append(" --py " + PYTHONBASE + codeFile);
+ } else {
+ commandBuilder.append(" --py " + PYTHONBASE);
+ }
+ }
+ return commandBuilder.toString();
+ }
+
+ public String genereateDeleteFunctionCommand() {
+ StringBuilder commandBuilder = new
StringBuilder("/pulsar/bin/pulsar-admin functions delete");
+ if (tenant != null) {
+ commandBuilder.append(" --tenant " + tenant);
+ }
+ if (namespace != null) {
+ commandBuilder.append(" --namespace " + namespace);
+ }
+ if (functionName != null) {
+ commandBuilder.append(" --name " + functionName);
+ }
+ return commandBuilder.toString();
+ }
+
+ public String generateTriggerFunctionCommand(String triggerValue) {
+ StringBuilder commandBuilder = new
StringBuilder("/pulsar/bin/pulsar-admin functions trigger");
+ if (tenant != null) {
+ commandBuilder.append(" --tenant " + tenant);
+ }
+ if (namespace != null) {
+ commandBuilder.append(" --namespace " + namespace);
+ }
+ if (functionName != null) {
+ commandBuilder.append(" --name " + functionName);
+ }
+ commandBuilder.append(" --triggerValue " + triggerValue);
+ return commandBuilder.toString();
+ }
+}