This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 791b596  Add integration tests on pulsar functions on process mode 
(#2063)
791b596 is described below

commit 791b5963d08fa002659706e28e73abdf8f80aa56
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Jul 2 10:22:17 2018 -0700

    Add integration tests on pulsar functions on process mode (#2063)
---
 tests/integration/semantics/pom.xml                |   4 +
 .../integration/functions/PulsarFunctionsTest.java | 138 ++++++++++-
 .../functions/PulsarFunctionsTestBase.java         |  12 +
 .../functions/utils/CommandGenerator.java          | 260 +++++++++++++++++++++
 4 files changed, 412 insertions(+), 2 deletions(-)

diff --git a/tests/integration/semantics/pom.xml 
b/tests/integration/semantics/pom.xml
index 5e8eeca..63c1621 100644
--- a/tests/integration/semantics/pom.xml
+++ b/tests/integration/semantics/pom.xml
@@ -38,6 +38,10 @@
 
   <dependencies>
     <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-client</artifactId>
       <version>${project.version}</version>
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 07d86f7..a35d466 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,12 +18,20 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
-import com.google.common.io.Files;
-import java.io.File;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.tests.containers.WorkerContainer;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
+import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
 import 
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
 import org.apache.pulsar.tests.topologies.PulsarCluster;
 import org.testcontainers.containers.Container.ExecResult;
@@ -32,6 +40,10 @@ import org.testng.annotations.Test;
 @Slf4j
 public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
+    //
+    // Tests on uploading/downloading function packages.
+    //
+
     @Test
     public String checkUpload() throws Exception {
         String bkPkgPath = String.format("%s/%s/%s",
@@ -82,4 +94,126 @@ public class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         assertTrue(output.getStderr().isEmpty());
     }
 
+    //
+    // Test CRUD functions on different runtimes.
+    //
+
+    @Test(dataProvider = "FunctionRuntimes")
+    public void testExclamationFunction(Runtime runtime) throws Exception {
+        String inputTopicName = "test-exclamation-" + runtime + "-input-" + 
randomName(8);
+        String outputTopicName = "test-exclamation-" + runtime + "-output-" + 
randomName(8);
+        String functionName = "test-exclamation-fn-" + randomName(8);
+        final int numMessages = 10;
+
+        // submit the exclamation function
+        submitExclamationFunction(
+            inputTopicName, outputTopicName, functionName);
+
+        // get function info
+        getFunctionInfoSuccess(functionName);
+
+        // publish and consume result
+        publishAndConsumeMessages(inputTopicName, outputTopicName, 
numMessages);
+
+        // get function status
+        getFunctionStatus(functionName, numMessages);
+
+        // delete function
+        deleteFunction(functionName);
+
+        // get function info
+        getFunctionInfoNotFound(functionName);
+    }
+
+    private static void submitExclamationFunction(String inputTopicName,
+                                                  String outputTopicName,
+                                                  String functionName) throws 
Exception {
+        CommandGenerator generator = 
CommandGenerator.createDefaultGenerator(inputTopicName, EXCLAMATION_FUNC_CLASS);
+        generator.setSinkTopic(outputTopicName);
+        generator.setFunctionName(functionName);
+        String command = generator.generateCreateFunctionCommand();
+        String[] commands = {
+            "sh", "-c", command
+        };
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            commands);
+        assertTrue(result.getStdout().contains("\"Created successfully\""));
+    }
+
+    private static void getFunctionInfoSuccess(String functionName) throws 
Exception {
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("\"name\": \"" + functionName + 
"\""));
+    }
+
+    private static void getFunctionInfoNotFound(String functionName) throws 
Exception {
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "get",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStderr().contains("Reason: Function " + 
functionName + " doesn't exist"));
+    }
+
+    private static void getFunctionStatus(String functionName, int 
numMessages) throws Exception {
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "getstatus",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("\"running\": true"));
+        assertTrue(result.getStdout().contains("\"numProcessed\": \"" + 
numMessages + "\""));
+        assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\": 
\"" + numMessages + "\""));
+    }
+
+    private static void publishAndConsumeMessages(String inputTopic,
+                                                  String outputTopic,
+                                                  int numMessages) throws 
Exception {
+        @Cleanup PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+        @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(outputTopic)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscriptionName("test-sub")
+            .subscribe();
+        @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic(inputTopic)
+            .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.send("message-" + i);
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<String> msg = consumer.receive();
+            assertEquals("message-" + i + "!", msg.getValue());
+        }
+    }
+
+    private static void deleteFunction(String functionName) throws Exception {
+        ExecResult result = pulsarCluster.getAnyWorker().execCmd(
+            PulsarCluster.ADMIN_SCRIPT,
+            "functions",
+            "delete",
+            "--tenant", "public",
+            "--namespace", "default",
+            "--name", functionName
+        );
+        assertTrue(result.getStdout().contains("Deleted successfully"));
+        assertTrue(result.getStderr().isEmpty());
+    }
+
 }
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 8e692ad..c0acd1d 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -18,14 +18,19 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
+import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
 import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 
 /**
  * A cluster to run pulsar functions for testing functions related features.
  */
 public class PulsarFunctionsTestBase extends PulsarClusterTestBase  {
 
+    public static final String EXCLAMATION_FUNC_CLASS =
+        "org.apache.pulsar.functions.api.examples.ExclamationFunction";
+
     @BeforeClass
     public static void setupCluster() throws Exception {
         PulsarClusterTestBase.setupCluster();
@@ -33,4 +38,11 @@ public class PulsarFunctionsTestBase extends 
PulsarClusterTestBase  {
         pulsarCluster.startFunctionWorkers(1);
     }
 
+    @DataProvider(name = "FunctionRuntimes")
+    public static Object[][] functionRuntimes() {
+        return new Object[][] {
+            new Object[] { Runtime.JAVA }
+        };
+    }
+
 }
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
new file mode 100644
index 0000000..12225fe
--- /dev/null
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions.utils;
+
+import com.google.gson.Gson;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.pulsar.tests.topologies.PulsarCluster;
+
+@Getter
+@Setter
+@ToString
+public class CommandGenerator {
+    public enum Runtime {
+        JAVA,
+        PYTHON,
+    };
+    private String functionName;
+    private String tenant = "public";
+    private String namespace = "default";
+    private String functionClassName;
+    private String sourceTopic;
+    private Map<String, String> customSereSourceTopics;
+    private String sinkTopic;
+    private String logTopic;
+    private String outputSerDe;
+    private String processingGuarantees;
+    private Runtime runtime;
+    private Integer parallelism;
+    private String adminUrl;
+    private Integer windowLengthCount;
+    private Long windowLengthDurationMs;
+    private Integer slidingIntervalCount;
+    private Long slidingIntervalDurationMs;
+
+    private Map<String, String> userConfig = new HashMap<>();
+    private static final String JAVAJAR = "/pulsar/examples/api-examples.jar";
+    private static final String PYTHONBASE = "/pulsar/examples/python";
+
+    public static CommandGenerator createDefaultGenerator(String sourceTopic, 
String functionClassName) {
+        CommandGenerator generator = new CommandGenerator();
+        generator.setSourceTopic(sourceTopic);
+        generator.setFunctionClassName(functionClassName);
+        generator.setRuntime(Runtime.JAVA);
+        return generator;
+    }
+
+    public static CommandGenerator createDefaultGenerator(Map<String, String> 
customSereSourceTopics,
+                                                          String 
functionClassName) {
+        CommandGenerator generator = new CommandGenerator();
+        generator.setCustomSereSourceTopics(customSereSourceTopics);
+        generator.setFunctionClassName(functionClassName);
+        generator.setRuntime(Runtime.JAVA);
+        return generator;
+    }
+
+    public static CommandGenerator createDefaultGenerator(String tenant, 
String namespace, String functionName) {
+        CommandGenerator generator = new CommandGenerator();
+        generator.setTenant(tenant);
+        generator.setNamespace(namespace);
+        generator.setFunctionName(functionName);
+        generator.setRuntime(Runtime.JAVA);
+        return generator;
+    }
+
+    public void createAdminUrl(String workerHost, int port) {
+        adminUrl = "http://"; + workerHost + ":" + port;
+    }
+
+    public String generateCreateFunctionCommand() {
+        return generateCreateFunctionCommand(null);
+    }
+
+    public String generateCreateFunctionCommand(String codeFile) {
+        StringBuilder commandBuilder = new 
StringBuilder(PulsarCluster.ADMIN_SCRIPT);
+        if (adminUrl != null) {
+            commandBuilder.append(" --admin-url ");
+            commandBuilder.append(adminUrl);
+        }
+        commandBuilder.append(" functions create");
+        if (tenant != null) {
+            commandBuilder.append(" --tenant " + tenant);
+        }
+        if (namespace != null) {
+            commandBuilder.append(" --namespace " + namespace);
+        }
+        if (functionName != null) {
+            commandBuilder.append(" --name " + functionName);
+        }
+        commandBuilder.append(" --className " + functionClassName);
+        if (sourceTopic != null) {
+            commandBuilder.append(" --inputs " + sourceTopic);
+        }
+        if (logTopic != null) {
+            commandBuilder.append(" --logTopic " + logTopic);
+        }
+        if (customSereSourceTopics != null && 
!customSereSourceTopics.isEmpty()) {
+            commandBuilder.append(" --customSerdeInputs \'" + new 
Gson().toJson(customSereSourceTopics) + "\'");
+        }
+        if (sinkTopic != null) {
+            commandBuilder.append(" --output " + sinkTopic);
+        }
+        if (outputSerDe != null) {
+            commandBuilder.append(" --outputSerdeClassName " + outputSerDe);
+        }
+        if (processingGuarantees != null) {
+            commandBuilder.append(" --processingGuarantees " + 
processingGuarantees);
+        }
+        if (!userConfig.isEmpty()) {
+            commandBuilder.append(" --userConfig \'" + new 
Gson().toJson(userConfig) + "\'");
+        }
+        if (parallelism != null) {
+            commandBuilder.append(" --parallelism " + parallelism);
+        }
+        if (windowLengthCount != null) {
+            commandBuilder.append(" --windowLengthCount " + windowLengthCount);
+        }
+        if (windowLengthDurationMs != null)  {
+            commandBuilder.append(" --windowLengthDurationMs " + 
windowLengthDurationMs);
+        }
+        if (slidingIntervalCount != null)  {
+            commandBuilder.append( " --slidingIntervalCount " + 
slidingIntervalCount);
+        }
+        if (slidingIntervalDurationMs != null)  {
+            commandBuilder.append(" --slidingIntervalDurationMs " + 
slidingIntervalDurationMs);
+        }
+
+        if (runtime == Runtime.JAVA) {
+            commandBuilder.append(" --jar " + JAVAJAR);
+        } else {
+            if (codeFile != null) {
+                commandBuilder.append(" --py " + PYTHONBASE + codeFile);
+            } else {
+                commandBuilder.append(" --py " + PYTHONBASE);
+            }
+        }
+        return commandBuilder.toString();
+    }
+
+    public String generateUpdateFunctionCommand() {
+        return generateUpdateFunctionCommand(null);
+    }
+
+    public String generateUpdateFunctionCommand(String codeFile) {
+        StringBuilder commandBuilder = new StringBuilder("PULSAR_MEM=-Xmx1024m 
");
+        if (adminUrl == null) {
+            commandBuilder.append("/pulsar/bin/pulsar-admin functions update");
+        } else {
+            commandBuilder.append("/pulsar/bin/pulsar-admin");
+            commandBuilder.append(" --admin-url ");
+            commandBuilder.append(adminUrl);
+            commandBuilder.append(" functions update");
+        }
+        if (tenant != null) {
+            commandBuilder.append(" --tenant " + tenant);
+        }
+        if (namespace != null) {
+            commandBuilder.append(" --namespace " + namespace);
+        }
+        if (functionName != null) {
+            commandBuilder.append(" --name " + functionName);
+        }
+        commandBuilder.append(" --className " + functionClassName);
+        if (sourceTopic != null) {
+            commandBuilder.append(" --inputs " + sourceTopic);
+        }
+        if (customSereSourceTopics != null && 
!customSereSourceTopics.isEmpty()) {
+            commandBuilder.append(" --customSerdeInputs \'" + new 
Gson().toJson(customSereSourceTopics) + "\'");
+        }
+        if (sinkTopic != null) {
+            commandBuilder.append(" --output " + sinkTopic);
+        }
+        if (logTopic != null) {
+            commandBuilder.append(" --logTopic " + logTopic);
+        }
+        if (outputSerDe != null) {
+            commandBuilder.append(" --outputSerdeClassName " + outputSerDe);
+        }
+        if (processingGuarantees != null) {
+            commandBuilder.append(" --processingGuarantees " + 
processingGuarantees);
+        }
+        if (!userConfig.isEmpty()) {
+            commandBuilder.append(" --userConfig \'" + new 
Gson().toJson(userConfig) + "\'");
+        }
+        if (parallelism != null) {
+            commandBuilder.append(" --parallelism " + parallelism);
+        }
+        if (windowLengthCount != null) {
+            commandBuilder.append(" --windowLengthCount " + windowLengthCount);
+        }
+        if (windowLengthDurationMs != null)  {
+            commandBuilder.append(" --windowLengthDurationMs " + 
windowLengthDurationMs);
+        }
+        if (slidingIntervalCount != null)  {
+            commandBuilder.append(" --slidingIntervalCount " + 
slidingIntervalCount);
+        }
+        if (slidingIntervalDurationMs != null)  {
+            commandBuilder.append(" --slidingIntervalDurationMs " + 
slidingIntervalDurationMs);
+        }
+
+        if (runtime == Runtime.JAVA) {
+            commandBuilder.append(" --jar " + JAVAJAR);
+        } else {
+            if (codeFile != null) {
+                commandBuilder.append(" --py " + PYTHONBASE + codeFile);
+            } else {
+                commandBuilder.append(" --py " + PYTHONBASE);
+            }
+        }
+        return commandBuilder.toString();
+    }
+
+    public String genereateDeleteFunctionCommand() {
+        StringBuilder commandBuilder = new 
StringBuilder("/pulsar/bin/pulsar-admin functions delete");
+        if (tenant != null) {
+            commandBuilder.append(" --tenant " + tenant);
+        }
+        if (namespace != null) {
+            commandBuilder.append(" --namespace " + namespace);
+        }
+        if (functionName != null) {
+            commandBuilder.append(" --name " + functionName);
+        }
+        return commandBuilder.toString();
+    }
+
+    public String generateTriggerFunctionCommand(String triggerValue) {
+        StringBuilder commandBuilder = new 
StringBuilder("/pulsar/bin/pulsar-admin functions trigger");
+        if (tenant != null) {
+            commandBuilder.append(" --tenant " + tenant);
+        }
+        if (namespace != null) {
+            commandBuilder.append(" --namespace " + namespace);
+        }
+        if (functionName != null) {
+            commandBuilder.append(" --name " + functionName);
+        }
+        commandBuilder.append(" --triggerValue " + triggerValue);
+        return commandBuilder.toString();
+    }
+}

Reply via email to