sijie closed pull request #2063: Add an integration test on pulsar functions on
process mode
URL: https://github.com/apache/incubator-pulsar/pull/2063
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/tests/integration/semantics/pom.xml
b/tests/integration/semantics/pom.xml
index 5e8eecaa7d..63c16219ed 100644
--- a/tests/integration/semantics/pom.xml
+++ b/tests/integration/semantics/pom.xml
@@ -37,6 +37,10 @@
<name>Apache Pulsar :: Tests :: Integration Tests :: Semantics</name>
<dependencies>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 07d86f7f0c..a35d4662b2 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,12 +18,20 @@
*/
package org.apache.pulsar.tests.integration.functions;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-import com.google.common.io.Files;
-import java.io.File;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.tests.containers.WorkerContainer;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
+import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
import org.apache.pulsar.tests.topologies.PulsarCluster;
import org.testcontainers.containers.Container.ExecResult;
@@ -32,6 +40,10 @@
@Slf4j
public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
+ //
+ // Tests on uploading/downloading function packages.
+ //
+
@Test
public String checkUpload() throws Exception {
String bkPkgPath = String.format("%s/%s/%s",
@@ -82,4 +94,126 @@ public void checkDownload() throws Exception {
assertTrue(output.getStderr().isEmpty());
}
+ //
+ // Test CRUD functions on different runtimes.
+ //
+
+ @Test(dataProvider = "FunctionRuntimes")
+ public void testExclamationFunction(Runtime runtime) throws Exception {
+ String inputTopicName = "test-exclamation-" + runtime + "-input-" +
randomName(8);
+ String outputTopicName = "test-exclamation-" + runtime + "-output-" +
randomName(8);
+ String functionName = "test-exclamation-fn-" + randomName(8);
+ final int numMessages = 10;
+
+ // submit the exclamation function
+ submitExclamationFunction(
+ inputTopicName, outputTopicName, functionName);
+
+ // get function info
+ getFunctionInfoSuccess(functionName);
+
+ // publish and consume result
+ publishAndConsumeMessages(inputTopicName, outputTopicName,
numMessages);
+
+ // get function status
+ getFunctionStatus(functionName, numMessages);
+
+ // delete function
+ deleteFunction(functionName);
+
+ // get function info
+ getFunctionInfoNotFound(functionName);
+ }
+
+ private static void submitExclamationFunction(String inputTopicName,
+ String outputTopicName,
+ String functionName) throws
Exception {
+ CommandGenerator generator =
CommandGenerator.createDefaultGenerator(inputTopicName, EXCLAMATION_FUNC_CLASS);
+ generator.setSinkTopic(outputTopicName);
+ generator.setFunctionName(functionName);
+ String command = generator.generateCreateFunctionCommand();
+ String[] commands = {
+ "sh", "-c", command
+ };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ commands);
+ assertTrue(result.getStdout().contains("\"Created successfully\""));
+ }
+
+ private static void getFunctionInfoSuccess(String functionName) throws
Exception {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+ assertTrue(result.getStdout().contains("\"name\": \"" + functionName +
"\""));
+ }
+
+ private static void getFunctionInfoNotFound(String functionName) throws
Exception {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+ assertTrue(result.getStderr().contains("Reason: Function " +
functionName + " doesn't exist"));
+ }
+
+ private static void getFunctionStatus(String functionName, int
numMessages) throws Exception {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "getstatus",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+ assertTrue(result.getStdout().contains("\"running\": true"));
+ assertTrue(result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\""));
+ assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\":
\"" + numMessages + "\""));
+ }
+
+ private static void publishAndConsumeMessages(String inputTopic,
+ String outputTopic,
+ int numMessages) throws
Exception {
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(outputTopic)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub")
+ .subscribe();
+ @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(inputTopic)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send("message-" + i);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<String> msg = consumer.receive();
+ assertEquals("message-" + i + "!", msg.getValue());
+ }
+ }
+
+ private static void deleteFunction(String functionName) throws Exception {
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "functions",
+ "delete",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", functionName
+ );
+ assertTrue(result.getStdout().contains("Deleted successfully"));
+ assertTrue(result.getStderr().isEmpty());
+ }
+
}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 8e692ad7b0..c0acd1de54 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -18,14 +18,19 @@
*/
package org.apache.pulsar.tests.integration.functions;
+import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
/**
* A cluster to run pulsar functions for testing functions related features.
*/
public class PulsarFunctionsTestBase extends PulsarClusterTestBase {
+ public static final String EXCLAMATION_FUNC_CLASS =
+ "org.apache.pulsar.functions.api.examples.ExclamationFunction";
+
@BeforeClass
public static void setupCluster() throws Exception {
PulsarClusterTestBase.setupCluster();
@@ -33,4 +38,11 @@ public static void setupCluster() throws Exception {
pulsarCluster.startFunctionWorkers(1);
}
+ @DataProvider(name = "FunctionRuntimes")
+ public static Object[][] functionRuntimes() {
+ return new Object[][] {
+ new Object[] { Runtime.JAVA }
+ };
+ }
+
}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
new file mode 100644
index 0000000000..12225fec10
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions.utils;
+
+import com.google.gson.Gson;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+
+@Getter
+@Setter
+@ToString
+public class CommandGenerator {
+ public enum Runtime {
+ JAVA,
+ PYTHON,
+ };
+ private String functionName;
+ private String tenant = "public";
+ private String namespace = "default";
+ private String functionClassName;
+ private String sourceTopic;
+ private Map<String, String> customSereSourceTopics;
+ private String sinkTopic;
+ private String logTopic;
+ private String outputSerDe;
+ private String processingGuarantees;
+ private Runtime runtime;
+ private Integer parallelism;
+ private String adminUrl;
+ private Integer windowLengthCount;
+ private Long windowLengthDurationMs;
+ private Integer slidingIntervalCount;
+ private Long slidingIntervalDurationMs;
+
+ private Map<String, String> userConfig = new HashMap<>();
+ private static final String JAVAJAR = "/pulsar/examples/api-examples.jar";
+ private static final String PYTHONBASE = "/pulsar/examples/python";
+
+ public static CommandGenerator createDefaultGenerator(String sourceTopic,
String functionClassName) {
+ CommandGenerator generator = new CommandGenerator();
+ generator.setSourceTopic(sourceTopic);
+ generator.setFunctionClassName(functionClassName);
+ generator.setRuntime(Runtime.JAVA);
+ return generator;
+ }
+
+ public static CommandGenerator createDefaultGenerator(Map<String, String>
customSereSourceTopics,
+ String
functionClassName) {
+ CommandGenerator generator = new CommandGenerator();
+ generator.setCustomSereSourceTopics(customSereSourceTopics);
+ generator.setFunctionClassName(functionClassName);
+ generator.setRuntime(Runtime.JAVA);
+ return generator;
+ }
+
+ public static CommandGenerator createDefaultGenerator(String tenant,
String namespace, String functionName) {
+ CommandGenerator generator = new CommandGenerator();
+ generator.setTenant(tenant);
+ generator.setNamespace(namespace);
+ generator.setFunctionName(functionName);
+ generator.setRuntime(Runtime.JAVA);
+ return generator;
+ }
+
+ public void createAdminUrl(String workerHost, int port) {
+ adminUrl = "http://" + workerHost + ":" + port;
+ }
+
+ public String generateCreateFunctionCommand() {
+ return generateCreateFunctionCommand(null);
+ }
+
+ public String generateCreateFunctionCommand(String codeFile) {
+ StringBuilder commandBuilder = new
StringBuilder(PulsarCluster.ADMIN_SCRIPT);
+ if (adminUrl != null) {
+ commandBuilder.append(" --admin-url ");
+ commandBuilder.append(adminUrl);
+ }
+ commandBuilder.append(" functions create");
+ if (tenant != null) {
+ commandBuilder.append(" --tenant " + tenant);
+ }
+ if (namespace != null) {
+ commandBuilder.append(" --namespace " + namespace);
+ }
+ if (functionName != null) {
+ commandBuilder.append(" --name " + functionName);
+ }
+ commandBuilder.append(" --className " + functionClassName);
+ if (sourceTopic != null) {
+ commandBuilder.append(" --inputs " + sourceTopic);
+ }
+ if (logTopic != null) {
+ commandBuilder.append(" --logTopic " + logTopic);
+ }
+ if (customSereSourceTopics != null &&
!customSereSourceTopics.isEmpty()) {
+ commandBuilder.append(" --customSerdeInputs \'" + new
Gson().toJson(customSereSourceTopics) + "\'");
+ }
+ if (sinkTopic != null) {
+ commandBuilder.append(" --output " + sinkTopic);
+ }
+ if (outputSerDe != null) {
+ commandBuilder.append(" --outputSerdeClassName " + outputSerDe);
+ }
+ if (processingGuarantees != null) {
+ commandBuilder.append(" --processingGuarantees " +
processingGuarantees);
+ }
+ if (!userConfig.isEmpty()) {
+ commandBuilder.append(" --userConfig \'" + new
Gson().toJson(userConfig) + "\'");
+ }
+ if (parallelism != null) {
+ commandBuilder.append(" --parallelism " + parallelism);
+ }
+ if (windowLengthCount != null) {
+ commandBuilder.append(" --windowLengthCount " + windowLengthCount);
+ }
+ if (windowLengthDurationMs != null) {
+ commandBuilder.append(" --windowLengthDurationMs " +
windowLengthDurationMs);
+ }
+ if (slidingIntervalCount != null) {
+ commandBuilder.append( " --slidingIntervalCount " +
slidingIntervalCount);
+ }
+ if (slidingIntervalDurationMs != null) {
+ commandBuilder.append(" --slidingIntervalDurationMs " +
slidingIntervalDurationMs);
+ }
+
+ if (runtime == Runtime.JAVA) {
+ commandBuilder.append(" --jar " + JAVAJAR);
+ } else {
+ if (codeFile != null) {
+ commandBuilder.append(" --py " + PYTHONBASE + codeFile);
+ } else {
+ commandBuilder.append(" --py " + PYTHONBASE);
+ }
+ }
+ return commandBuilder.toString();
+ }
+
+ public String generateUpdateFunctionCommand() {
+ return generateUpdateFunctionCommand(null);
+ }
+
+ public String generateUpdateFunctionCommand(String codeFile) {
+ StringBuilder commandBuilder = new StringBuilder("PULSAR_MEM=-Xmx1024m
");
+ if (adminUrl == null) {
+ commandBuilder.append("/pulsar/bin/pulsar-admin functions update");
+ } else {
+ commandBuilder.append("/pulsar/bin/pulsar-admin");
+ commandBuilder.append(" --admin-url ");
+ commandBuilder.append(adminUrl);
+ commandBuilder.append(" functions update");
+ }
+ if (tenant != null) {
+ commandBuilder.append(" --tenant " + tenant);
+ }
+ if (namespace != null) {
+ commandBuilder.append(" --namespace " + namespace);
+ }
+ if (functionName != null) {
+ commandBuilder.append(" --name " + functionName);
+ }
+ commandBuilder.append(" --className " + functionClassName);
+ if (sourceTopic != null) {
+ commandBuilder.append(" --inputs " + sourceTopic);
+ }
+ if (customSereSourceTopics != null &&
!customSereSourceTopics.isEmpty()) {
+ commandBuilder.append(" --customSerdeInputs \'" + new
Gson().toJson(customSereSourceTopics) + "\'");
+ }
+ if (sinkTopic != null) {
+ commandBuilder.append(" --output " + sinkTopic);
+ }
+ if (logTopic != null) {
+ commandBuilder.append(" --logTopic " + logTopic);
+ }
+ if (outputSerDe != null) {
+ commandBuilder.append(" --outputSerdeClassName " + outputSerDe);
+ }
+ if (processingGuarantees != null) {
+ commandBuilder.append(" --processingGuarantees " +
processingGuarantees);
+ }
+ if (!userConfig.isEmpty()) {
+ commandBuilder.append(" --userConfig \'" + new
Gson().toJson(userConfig) + "\'");
+ }
+ if (parallelism != null) {
+ commandBuilder.append(" --parallelism " + parallelism);
+ }
+ if (windowLengthCount != null) {
+ commandBuilder.append(" --windowLengthCount " + windowLengthCount);
+ }
+ if (windowLengthDurationMs != null) {
+ commandBuilder.append(" --windowLengthDurationMs " +
windowLengthDurationMs);
+ }
+ if (slidingIntervalCount != null) {
+ commandBuilder.append(" --slidingIntervalCount " +
slidingIntervalCount);
+ }
+ if (slidingIntervalDurationMs != null) {
+ commandBuilder.append(" --slidingIntervalDurationMs " +
slidingIntervalDurationMs);
+ }
+
+ if (runtime == Runtime.JAVA) {
+ commandBuilder.append(" --jar " + JAVAJAR);
+ } else {
+ if (codeFile != null) {
+ commandBuilder.append(" --py " + PYTHONBASE + codeFile);
+ } else {
+ commandBuilder.append(" --py " + PYTHONBASE);
+ }
+ }
+ return commandBuilder.toString();
+ }
+
+ public String genereateDeleteFunctionCommand() {
+ StringBuilder commandBuilder = new
StringBuilder("/pulsar/bin/pulsar-admin functions delete");
+ if (tenant != null) {
+ commandBuilder.append(" --tenant " + tenant);
+ }
+ if (namespace != null) {
+ commandBuilder.append(" --namespace " + namespace);
+ }
+ if (functionName != null) {
+ commandBuilder.append(" --name " + functionName);
+ }
+ return commandBuilder.toString();
+ }
+
+ public String generateTriggerFunctionCommand(String triggerValue) {
+ StringBuilder commandBuilder = new
StringBuilder("/pulsar/bin/pulsar-admin functions trigger");
+ if (tenant != null) {
+ commandBuilder.append(" --tenant " + tenant);
+ }
+ if (namespace != null) {
+ commandBuilder.append(" --namespace " + namespace);
+ }
+ if (functionName != null) {
+ commandBuilder.append(" --name " + functionName);
+ }
+ commandBuilder.append(" --triggerValue " + triggerValue);
+ return commandBuilder.toString();
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services