This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e16f91c Run functions runtime test with thread mode (#2070)
e16f91c is described below
commit e16f91c116c133e1741d0a20b69644fa82fa02a0
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Jul 2 17:14:21 2018 -0700
Run functions runtime test with thread mode (#2070)
*Motivation*
Pulsar Functions supports both process and thread modes to run functions.
Currently the test case only test process mode.
*Changes*
Improve the existing test to support testing pulsar functions in thread mode
---
docker/pulsar/scripts/gen-yml-from-env.py | 28 ++++-
.../pulsar/functions/worker/WorkerConfig.java | 1 -
.../pulsar/tests/topologies/PulsarCluster.java | 39 +++++--
.../integration/functions/PulsarFunctionsTest.java | 130 ---------------------
.../functions/PulsarFunctionsTestBase.java | 10 +-
.../runtime/PulsarFunctionsProcessRuntimeTest.java | 25 ++++
.../PulsarFunctionsRuntimeTest.java} | 66 +----------
.../PulsarFunctionsThreadRuntimeTest.java} | 24 ++--
8 files changed, 102 insertions(+), 221 deletions(-)
diff --git a/docker/pulsar/scripts/gen-yml-from-env.py
b/docker/pulsar/scripts/gen-yml-from-env.py
index 28599a5..45fe2a5 100755
--- a/docker/pulsar/scripts/gen-yml-from-env.py
+++ b/docker/pulsar/scripts/gen-yml-from-env.py
@@ -27,6 +27,18 @@
import os, sys
import yaml
+INT_KEYS = [
+ 'workerPort',
+ 'numFunctionPackageReplicas',
+ 'failureCheckFreqMs',
+ 'rescheduleTimeoutMs',
+ 'initialBrokerReconnectMaxRetries',
+ 'assignmentWriteMaxRetries',
+ 'instanceLivenessCheckFreqMs'
+]
+
+PF_ENV_PREFIX = 'PF_'
+
if len(sys.argv) < 2:
print 'Usage: %s' % (sys.argv[0])
sys.exit(1)
@@ -39,25 +51,29 @@ for conf_filename in conf_files:
# update the config
modified = False
for k in sorted(os.environ.keys()):
- key_parts = k.split('_')
+ if not k.startswith(PF_ENV_PREFIX):
+ continue
+
v = os.environ[k]
+ k = k[len(PF_ENV_PREFIX):]
+ key_parts = k.split('_')
+
i = 0
conf_to_modify = conf
while i < len(key_parts):
key_part = key_parts[i]
- if not key_part in conf_to_modify:
- break
-
if i == (len(key_parts) - 1):
- if key_part == 'workerPort':
+ if key_part in INT_KEYS:
conf_to_modify[key_part] = int(v)
else:
conf_to_modify[key_part] = v
-
modified = True
else:
+ if not key_part in conf_to_modify:
+ conf_to_modify[key_part] = {}
conf_to_modify = conf_to_modify[key_part]
+ modified = True
i += 1
# Store back the updated config in the same file
f = open(conf_filename , 'w')
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index efbd0fd..3e8f4b3 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -54,7 +54,6 @@ public class WorkerConfig implements Serializable {
private String pulsarFunctionsCluster;
private int numFunctionPackageReplicas;
private String downloadDirectory;
- private long snapshotFreqMs;
private String stateStorageServiceUrl;
private String functionAssignmentTopicName;
private String schedulerClassName;
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index a721915..c7bcf62 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -183,7 +183,7 @@ public class PulsarCluster {
}
}
- public void startFunctionWorkers(int numFunctionWorkers) {
+ public void startFunctionWorkersWithProcessContainerFactory(int
numFunctionWorkers) {
String serviceUrl = "pulsar://pulsar-broker-0:" +
PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" +
PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers(
@@ -193,14 +193,39 @@ public class PulsarCluster {
.withNetwork(network)
.withNetworkAliases(name)
// worker settings
- .withEnv("workerId", name)
- .withEnv("workerHostname", name)
- .withEnv("workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
- .withEnv("pulsarFunctionsCluster", clusterName)
- .withEnv("pulsarServiceUrl", serviceUrl)
- .withEnv("pulsarWebServiceUrl", httpServiceUrl)
+ .withEnv("PF_workerId", name)
+ .withEnv("PF_workerHostname", name)
+ .withEnv("PF_workerPort", "" +
PulsarContainer.BROKER_HTTP_PORT)
+ .withEnv("PF_pulsarFunctionsCluster", clusterName)
+ .withEnv("PF_pulsarServiceUrl", serviceUrl)
+ .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
+ // script
.withEnv("clusterName", clusterName)
+ .withEnv("zookeeperServers", ZKContainer.NAME)
+ // bookkeeper tools
+ .withEnv("zkServers", ZKContainer.NAME)
+ ));
+ }
+
+ public void startFunctionWorkersWithThreadContainerFactory(int
numFunctionWorkers) {
+ String serviceUrl = "pulsar://pulsar-broker-0:" +
PulsarContainer.BROKER_PORT;
+ String httpServiceUrl = "http://pulsar-broker-0:" +
PulsarContainer.BROKER_HTTP_PORT;
+ workerContainers.putAll(runNumContainers(
+ "functions-worker",
+ numFunctionWorkers,
+ (name) -> new WorkerContainer(clusterName, name)
+ .withNetwork(network)
+ .withNetworkAliases(name)
+ // worker settings
+ .withEnv("PF_workerId", name)
+ .withEnv("PF_workerHostname", name)
+ .withEnv("PF_workerPort", "" +
PulsarContainer.BROKER_HTTP_PORT)
+ .withEnv("PF_pulsarFunctionsCluster", clusterName)
+ .withEnv("PF_pulsarServiceUrl", serviceUrl)
+ .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
+ .withEnv("PF_threadContainerFactory_threadGroupName",
"pf-container-group")
// script
+ .withEnv("clusterName", clusterName)
.withEnv("zookeeperServers", ZKContainer.NAME)
// bookkeeper tools
.withEnv("zkServers", ZKContainer.NAME)
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index a35d466..625d800 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,20 +18,10 @@
*/
package org.apache.pulsar.tests.integration.functions;
-import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.tests.containers.WorkerContainer;
-import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
-import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
import org.apache.pulsar.tests.topologies.PulsarCluster;
import org.testcontainers.containers.Container.ExecResult;
@@ -94,126 +84,6 @@ public class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
assertTrue(output.getStderr().isEmpty());
}
- //
- // Test CRUD functions on different runtimes.
- //
-
- @Test(dataProvider = "FunctionRuntimes")
- public void testExclamationFunction(Runtime runtime) throws Exception {
- String inputTopicName = "test-exclamation-" + runtime + "-input-" +
randomName(8);
- String outputTopicName = "test-exclamation-" + runtime + "-output-" +
randomName(8);
- String functionName = "test-exclamation-fn-" + randomName(8);
- final int numMessages = 10;
-
- // submit the exclamation function
- submitExclamationFunction(
- inputTopicName, outputTopicName, functionName);
-
- // get function info
- getFunctionInfoSuccess(functionName);
-
- // publish and consume result
- publishAndConsumeMessages(inputTopicName, outputTopicName,
numMessages);
-
- // get function status
- getFunctionStatus(functionName, numMessages);
-
- // delete function
- deleteFunction(functionName);
- // get function info
- getFunctionInfoNotFound(functionName);
- }
-
- private static void submitExclamationFunction(String inputTopicName,
- String outputTopicName,
- String functionName) throws
Exception {
- CommandGenerator generator =
CommandGenerator.createDefaultGenerator(inputTopicName, EXCLAMATION_FUNC_CLASS);
- generator.setSinkTopic(outputTopicName);
- generator.setFunctionName(functionName);
- String command = generator.generateCreateFunctionCommand();
- String[] commands = {
- "sh", "-c", command
- };
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(
- commands);
- assertTrue(result.getStdout().contains("\"Created successfully\""));
- }
-
- private static void getFunctionInfoSuccess(String functionName) throws
Exception {
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "get",
- "--tenant", "public",
- "--namespace", "default",
- "--name", functionName
- );
- assertTrue(result.getStdout().contains("\"name\": \"" + functionName +
"\""));
- }
-
- private static void getFunctionInfoNotFound(String functionName) throws
Exception {
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "get",
- "--tenant", "public",
- "--namespace", "default",
- "--name", functionName
- );
- assertTrue(result.getStderr().contains("Reason: Function " +
functionName + " doesn't exist"));
- }
-
- private static void getFunctionStatus(String functionName, int
numMessages) throws Exception {
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "getstatus",
- "--tenant", "public",
- "--namespace", "default",
- "--name", functionName
- );
- assertTrue(result.getStdout().contains("\"running\": true"));
- assertTrue(result.getStdout().contains("\"numProcessed\": \"" +
numMessages + "\""));
- assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\":
\"" + numMessages + "\""));
- }
-
- private static void publishAndConsumeMessages(String inputTopic,
- String outputTopic,
- int numMessages) throws
Exception {
- @Cleanup PulsarClient client = PulsarClient.builder()
- .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
- .build();
- @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic(outputTopic)
- .subscriptionType(SubscriptionType.Exclusive)
- .subscriptionName("test-sub")
- .subscribe();
- @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
- .topic(inputTopic)
- .create();
-
- for (int i = 0; i < numMessages; i++) {
- producer.send("message-" + i);
- }
-
- for (int i = 0; i < numMessages; i++) {
- Message<String> msg = consumer.receive();
- assertEquals("message-" + i + "!", msg.getValue());
- }
- }
-
- private static void deleteFunction(String functionName) throws Exception {
- ExecResult result = pulsarCluster.getAnyWorker().execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "functions",
- "delete",
- "--tenant", "public",
- "--namespace", "default",
- "--name", functionName
- );
- assertTrue(result.getStdout().contains("Deleted successfully"));
- assertTrue(result.getStderr().isEmpty());
- }
}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index c0acd1d..dc2e290 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -18,15 +18,18 @@
*/
package org.apache.pulsar.tests.integration.functions;
+import lombok.extern.slf4j.Slf4j;
import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Container.ExecResult;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
/**
* A cluster to run pulsar functions for testing functions related features.
*/
-public class PulsarFunctionsTestBase extends PulsarClusterTestBase {
+@Slf4j
+public abstract class PulsarFunctionsTestBase extends PulsarClusterTestBase {
public static final String EXCLAMATION_FUNC_CLASS =
"org.apache.pulsar.functions.api.examples.ExclamationFunction";
@@ -35,7 +38,10 @@ public class PulsarFunctionsTestBase extends
PulsarClusterTestBase {
public static void setupCluster() throws Exception {
PulsarClusterTestBase.setupCluster();
- pulsarCluster.startFunctionWorkers(1);
+ pulsarCluster.startFunctionWorkersWithProcessContainerFactory(1);
+
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat",
"/pulsar/conf/functions_worker.yml");
+ log.info("Functions Worker Config : \n{}", result.getStdout());
}
@DataProvider(name = "FunctionRuntimes")
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsProcessRuntimeTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsProcessRuntimeTest.java
new file mode 100644
index 0000000..7c4033f
--- /dev/null
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsProcessRuntimeTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions.runtime;
+
+/**
+ * Run runtime tests in process mode.
+ */
+public class PulsarFunctionsProcessRuntimeTest extends
PulsarFunctionsRuntimeTest {
+}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
similarity index 75%
copy from
tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
copy to
tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
index a35d466..e816f70 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsRuntimeTest.java
@@ -16,83 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.tests.integration.functions;
+package org.apache.pulsar.tests.integration.functions.runtime;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.tests.containers.WorkerContainer;
+import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
-import
org.apache.pulsar.tests.integration.functions.utils.UploadDownloadCommandGenerator;
import org.apache.pulsar.tests.topologies.PulsarCluster;
import org.testcontainers.containers.Container.ExecResult;
import org.testng.annotations.Test;
-@Slf4j
-public class PulsarFunctionsTest extends PulsarFunctionsTestBase {
-
- //
- // Tests on uploading/downloading function packages.
- //
-
- @Test
- public String checkUpload() throws Exception {
- String bkPkgPath = String.format("%s/%s/%s",
- "tenant-" + randomName(8),
- "ns-" + randomName(8),
- "fn-" + randomName(8));
-
- UploadDownloadCommandGenerator generator =
UploadDownloadCommandGenerator.createUploader(
- PulsarCluster.ADMIN_SCRIPT,
- bkPkgPath);
- String actualCommand = generator.generateCommand();
-
- log.info(actualCommand);
-
- String[] commands = {
- "sh", "-c", actualCommand
- };
- ExecResult output = pulsarCluster.getAnyWorker().execCmd(commands);
- assertTrue(output.getStdout().contains("\"Uploaded successfully\""));
- return bkPkgPath;
- }
-
- @Test
- public void checkDownload() throws Exception {
- String bkPkgPath = checkUpload();
- String localPkgFile = "/tmp/checkdownload-" + randomName(16);
-
- UploadDownloadCommandGenerator generator =
UploadDownloadCommandGenerator.createDownloader(
- localPkgFile,
- bkPkgPath);
- String actualCommand = generator.generateCommand();
-
- log.info(actualCommand);
-
- String[] commands = {
- "sh", "-c", actualCommand
- };
- WorkerContainer container = pulsarCluster.getAnyWorker();
- ExecResult output = container.execCmd(commands);
- assertTrue(output.getStdout().contains("\"Downloaded successfully\""));
- String[] diffCommand = {
- "diff",
- PulsarCluster.ADMIN_SCRIPT,
- localPkgFile
- };
- output = container.execCmd(diffCommand);
- assertTrue(output.getStdout().isEmpty());
- assertTrue(output.getStderr().isEmpty());
- }
+/**
+ * The tests that run over different container mode.
+ */
+public abstract class PulsarFunctionsRuntimeTest extends
PulsarFunctionsTestBase {
//
// Test CRUD functions on different runtimes.
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsThreadRuntimeTest.java
similarity index 60%
copy from
tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
copy to
tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsThreadRuntimeTest.java
index c0acd1d..61ddba3 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/functions/runtime/PulsarFunctionsThreadRuntimeTest.java
@@ -16,33 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.tests.integration.functions;
+package org.apache.pulsar.tests.integration.functions.runtime;
-import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Container.ExecResult;
import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
/**
- * A cluster to run pulsar functions for testing functions related features.
+ * Run the runtime test cases in thread mode.
*/
-public class PulsarFunctionsTestBase extends PulsarClusterTestBase {
-
- public static final String EXCLAMATION_FUNC_CLASS =
- "org.apache.pulsar.functions.api.examples.ExclamationFunction";
+@Slf4j
+public class PulsarFunctionsThreadRuntimeTest extends
PulsarFunctionsRuntimeTest {
@BeforeClass
public static void setupCluster() throws Exception {
PulsarClusterTestBase.setupCluster();
- pulsarCluster.startFunctionWorkers(1);
- }
+ pulsarCluster.startFunctionWorkersWithThreadContainerFactory(1);
- @DataProvider(name = "FunctionRuntimes")
- public static Object[][] functionRuntimes() {
- return new Object[][] {
- new Object[] { Runtime.JAVA }
- };
+ ExecResult result = pulsarCluster.getAnyWorker().execCmd("cat",
"/pulsar/conf/functions_worker.yml");
+ log.info("Functions Worker Config : \n{}", result.getStdout());
}
}