merlimat closed pull request #1362: More Pulsar Functions documentation
URL: https://github.com/apache/incubator-pulsar/pull/1362
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/.gitignore b/.gitignore
index aa6ed821b..dcb737352 100644
--- a/.gitignore
+++ b/.gitignore
@@ -26,6 +26,9 @@ pulsar-broker/src/test/resources/log4j2.yaml
# Mac
.DS_Store
+# VisualStudioCode artifacts
+.vscode/
+
# Maven
log/
target/
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 5dcf070b0..eb7fed74a 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -65,7 +65,7 @@
import org.apache.pulsar.functions.utils.Utils;
@Slf4j
-@Parameters(commandDescription = "Operations about functions")
+@Parameters(commandDescription = "Operations for managing Pulsar Functions")
public class CmdFunctions extends CmdBase {
private final PulsarAdminWithFunctions fnAdmin;
@@ -100,10 +100,10 @@ void processArguments() throws Exception {}
*/
@Getter
abstract class NamespaceCommand extends BaseCommand {
- @Parameter(names = "--tenant", description = "Tenant Name", required =
true)
+ @Parameter(names = "--tenant", description = "Tenant name", required =
true)
protected String tenant;
- @Parameter(names = "--namespace", description = "Namespace Name",
required = true)
+ @Parameter(names = "--namespace", description = "Namespace name",
required = true)
protected String namespace;
}
@@ -112,7 +112,7 @@ void processArguments() throws Exception {}
*/
@Getter
abstract class FunctionCommand extends NamespaceCommand {
- @Parameter(names = "--name", description = "Function Name", required =
true)
+ @Parameter(names = "--name", description = "Function name", required =
true)
protected String functionName;
}
@@ -121,43 +121,43 @@ void processArguments() throws Exception {}
*/
@Getter
abstract class FunctionConfigCommand extends BaseCommand {
- @Parameter(names = "--tenant", description = "Tenant Name")
+ @Parameter(names = "--tenant", description = "Tenant name")
protected String tenant;
- @Parameter(names = "--namespace", description = "Namespace Name")
+ @Parameter(names = "--namespace", description = "Namespace name")
protected String namespace;
- @Parameter(names = "--name", description = "Function Name")
+ @Parameter(names = "--name", description = "Function name")
protected String functionName;
- @Parameter(names = "--className", description = "Function Class Name",
required = true)
+ @Parameter(names = "--className", description = "Function class name",
required = true)
protected String className;
@Parameter(
names = "--jar",
- description = "Path to Jar",
+ description = "Path to the Java JAR file",
listConverter = StringConverter.class)
protected String jarFile;
@Parameter(
names = "--py",
- description = "Path to Python",
+ description = "Path to the main Python file",
listConverter = StringConverter.class)
protected String pyFile;
- @Parameter(names = "--inputs", description = "Input Topic Name")
+ @Parameter(names = "--inputs", description = "Input topic name")
protected String inputs;
- @Parameter(names = "--output", description = "Output Topic Name")
+ @Parameter(names = "--output", description = "Output topic name")
protected String output;
- @Parameter(names = "--logTopic", description = "Log Topic")
+ @Parameter(names = "--logTopic", description = "Log topic")
protected String logTopic;
- @Parameter(names = "--customSerdeInputs", description = "Map of input
topic to serde classname")
+ @Parameter(names = "--customSerdeInputs", description = "Map of input
topic to SerDe class name")
protected String customSerdeInputString;
@Parameter(names = "--outputSerdeClassName", description = "Output
SerDe")
protected String outputSerdeClassName;
- @Parameter(names = "--functionConfigFile", description = "Function
Config")
+ @Parameter(names = "--functionConfigFile", description = "Path to a
YAML config file for the function")
protected String fnConfigFile;
- @Parameter(names = "--processingGuarantees", description = "Processing
Guarantees")
+ @Parameter(names = "--processingGuarantees", description = "Processing
guarantees applied to the function")
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
- @Parameter(names = "--subscriptionType", description = "The type of
subscription")
+ @Parameter(names = "--subscriptionType", description = "The type of
subscription used by the function as a consumer")
protected FunctionConfig.SubscriptionType subscriptionType;
- @Parameter(names = "--userConfig", description = "User Config")
+ @Parameter(names = "--userConfig", description = "User-defined config
key/values")
protected String userConfigString;
- @Parameter(names = "--parallelism", description = "Function
Parallelism")
+ @Parameter(names = "--parallelism", description = "The function's
parallelism factor (i.e. the number of function instances to run)")
protected String parallelism;
protected FunctionConfig functionConfig;
@@ -256,7 +256,7 @@ private void doJavaSubmitChecks(FunctionConfig.Builder
functionConfigBuilder) {
functionConfigBuilder.getClassName(), jarFile));
} else if (!Reflections.classInJarImplementsIface(file,
functionConfigBuilder.getClassName(), Function.class)
&& !Reflections.classInJarImplementsIface(file,
functionConfigBuilder.getClassName(), java.util.function.Function.class)) {
- throw new IllegalArgumentException(String.format("Pulsar
function class %s in jar %s implements neither Function nor
java.util.function.Function",
+ throw new IllegalArgumentException(String.format("Pulsar
function class %s in jar %s implements neither
org.apache.pulsar.functions.api.Function nor java.util.function.Function",
functionConfigBuilder.getClassName(), jarFile));
}
@@ -444,10 +444,10 @@ private String getUniqueInput(FunctionConfig.Builder
builder) {
class LocalRunner extends FunctionConfigCommand {
// TODO: this should become bookkeeper url and it should be fetched
from pulsar client.
- @Parameter(names = "--stateStorageServiceUrl", description = "state
storage service url")
+ @Parameter(names = "--stateStorageServiceUrl", description = "State
storage service URL")
protected String stateStorageServiceUrl;
- @Parameter(names = "--brokerServiceUrl", description = "The pulsar
broker url")
+ @Parameter(names = "--brokerServiceUrl", description = "The Pulsar
broker URL")
protected String brokerServiceUrl;
@Override
@@ -618,16 +618,16 @@ void runCmd() throws Exception {
}
}
- @Parameters(commandDescription = "Trigger function")
+ @Parameters(commandDescription = "Triggers the specified Pulsar Function
with a supplied value")
class TriggerFunction extends FunctionCommand {
- @Parameter(names = "--triggerValue", description = "The value the
function needs to be triggered with")
+ @Parameter(names = "--triggerValue", description = "The value with
which you want to trigger the function")
protected String triggerValue;
- @Parameter(names = "--triggerFile", description = "The fileName that
contains data the function needs to be triggered with")
+ @Parameter(names = "--triggerFile", description = "The path to the
file that contains the data with which you'd like to trigger the function")
protected String triggerFile;
@Override
void runCmd() throws Exception {
if (triggerFile == null && triggerValue == null) {
- throw new RuntimeException("One of triggerValue/triggerFile
has to be present");
+ throw new RuntimeException("Either a trigger value or a
trigger filepath needs to be present");
}
String retval = fnAdmin.functions().triggerFunction(tenant,
namespace, functionName, triggerValue, triggerFile);
System.out.println(retval);
diff --git
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextFunction.java
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextFunction.java
new file mode 100644
index 000000000..489715c58
--- /dev/null
+++
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextFunction.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.slf4j.Logger;
+
+import java.util.stream.Collectors;
+
+public class ContextFunction implements Function<String, Void> {
+ @Override
+ public Void process(String input, Context context) {
+ Logger LOG = context.getLogger();
+ String inputTopics =
context.getInputTopics().stream().collect(Collectors.joining(", "));
+ String functionName = context.getFunctionName();
+
+ String logMessage = String.format("A message with a value of \"%s\"
has arrived on one of the following topics: %s\n",
+ input,
+ inputTopics);
+
+ LOG.info(logMessage);
+
+ String metricName = String.format("function-%s-messages-received",
functionName);
+ context.recordMetric(metricName, 1);
+
+ return null;
+ }
+}
diff --git
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java
index 7bf165767..9bc25db92 100644
---
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java
+++
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java
@@ -21,14 +21,12 @@
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
+import java.util.Arrays;
+
public class CounterFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
- String[] parts = input.split("\\.");
-
- for (String part : parts) {
- context.incrCounter(part, 1);
- }
+ Arrays.asList(input.split("\\.")).forEach(word ->
context.incrCounter(word, 1));
return null;
}
diff --git
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
index ae8ef7619..097c12b5b 100644
---
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
+++
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
@@ -22,9 +22,8 @@
import org.apache.pulsar.functions.api.Function;
public class ExclamationFunction implements Function<String, String> {
-
@Override
public String process(String input, Context context) {
- return input + "!";
+ return String.format("%s!", input);
}
}
diff --git
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeExclmationFunction.java
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeExclmationFunction.java
index ddd45d7e4..4653e1bba 100644
---
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeExclmationFunction.java
+++
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeExclmationFunction.java
@@ -23,6 +23,6 @@
public class JavaNativeExclmationFunction implements Function<String, String> {
@Override
public String apply(String input) {
- return input + "!";
+ return String.format("%s!", input);
}
}
diff --git
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
index 5d2ff5a2a..9cf3f550a 100644
---
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
+++
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
@@ -19,8 +19,10 @@
package org.apache.pulsar.functions.api.examples;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
+import org.slf4j.Logger;
/**
* A function with logging example.
@@ -29,17 +31,17 @@
private static final AtomicIntegerFieldUpdater<LoggingFunction>
COUNTER_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(LoggingFunction.class, "counter");
- private volatile int counter = 0;
@Override
public String process(String input, Context context) {
+ Logger LOG = context.getLogger();
int counterLocal = COUNTER_UPDATER.incrementAndGet(this);
if ((counterLocal & Integer.MAX_VALUE) % 100000 == 0) {
- context.getLogger().info("Handled {} messages", counterLocal);
+ LOG.info("Handled {} messages", counterLocal);
}
- return input + "!";
+ return String.format("%s!", input);
}
}
diff --git
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java
index 2a9b95c3f..e23d37f9a 100644
---
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java
+++
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java
@@ -22,7 +22,6 @@
import org.apache.pulsar.functions.api.Function;
public class UserMetricFunction implements Function<String, Void> {
-
@Override
public Void process(String input, Context context) {
context.recordMetric("MyMetricName", 1);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 40d2c0b62..f6d6523ef 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -107,7 +107,7 @@ public Response registerFunction(final @PathParam("tenant")
String tenant,
log.error("Function {}/{}/{} already exists", tenant, namespace,
functionName);
return Response.status(Status.BAD_REQUEST)
.type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(String.format("Function %s already
exist", functionName))).build();
+ .entity(new ErrorData(String.format("Function %s already
exists", functionName))).build();
}
// function state
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 0f182d1cc..50c6b7b36 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -362,7 +362,7 @@ public void testRegisterExistedFunction() throws
IOException {
Response response = registerDefaultFunction();
assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
- assertEquals(new ErrorData("Function " + function + " already
exist").reason, ((ErrorData) response.getEntity()).reason);
+ assertEquals(new ErrorData("Function " + function + " already
exists").reason, ((ErrorData) response.getEntity()).reason);
}
@Test
diff --git a/site/Gemfile b/site/Gemfile
index 356d837d4..7ca13afbf 100644
--- a/site/Gemfile
+++ b/site/Gemfile
@@ -20,4 +20,4 @@
source 'https://rubygems.org'
ruby '2.3.1'
-gem 'jekyll', '3.7.0'
+gem 'jekyll', '3.7.3'
diff --git a/site/Gemfile.lock b/site/Gemfile.lock
index 9cbb328f9..b4f5b46e6 100644
--- a/site/Gemfile.lock
+++ b/site/Gemfile.lock
@@ -9,12 +9,12 @@ GEM
eventmachine (>= 0.12.9)
http_parser.rb (~> 0.6.0)
eventmachine (1.2.5)
- ffi (1.9.18)
+ ffi (1.9.23)
forwardable-extended (2.6.0)
http_parser.rb (0.6.0)
- i18n (0.9.1)
+ i18n (0.9.5)
concurrent-ruby (~> 1.0)
- jekyll (3.7.0)
+ jekyll (3.7.3)
addressable (~> 2.4)
colorator (~> 1.0)
em-websocket (~> 0.5)
@@ -27,7 +27,7 @@ GEM
pathutil (~> 0.9)
rouge (>= 1.7, < 4)
safe_yaml (~> 1.0)
- jekyll-sass-converter (1.5.1)
+ jekyll-sass-converter (1.5.2)
sass (~> 3.4)
jekyll-watch (2.0.0)
listen (~> 3.0)
@@ -40,11 +40,11 @@ GEM
mercenary (0.3.6)
pathutil (0.16.1)
forwardable-extended (~> 2.6)
- public_suffix (3.0.1)
- rb-fsevent (0.10.2)
+ public_suffix (3.0.2)
+ rb-fsevent (0.10.3)
rb-inotify (0.9.10)
ffi (>= 0.5.0, < 2)
- rouge (3.1.0)
+ rouge (3.1.1)
ruby_dep (1.5.0)
safe_yaml (1.0.4)
sass (3.5.5)
@@ -57,7 +57,7 @@ PLATFORMS
ruby
DEPENDENCIES
- jekyll (= 3.7.0)
+ jekyll (= 3.7.3)
RUBY VERSION
ruby 2.3.1p112
diff --git a/site/_config.local.yml b/site/_config.local.yml
index a4b9f9f47..4a3af200a 100644
--- a/site/_config.local.yml
+++ b/site/_config.local.yml
@@ -19,5 +19,3 @@
destination: generated
baseurl: ""
-include:
-- docs/example.md
diff --git a/site/_config.yml b/site/_config.yml
index a0880e6b4..94ad42c08 100644
--- a/site/_config.yml
+++ b/site/_config.yml
@@ -25,6 +25,7 @@ pulsar_repo:
https://github.com/apache/incubator-pulsar/tree/master
baseurl: /
destination: ../generated-site/content
+preview_version: 2.0.0-streamlio-6
current_version: 1.22.0-incubating
archived_releases:
- 1.21.0-incubating
diff --git a/site/_data/cli/pulsar-admin.yaml b/site/_data/cli/pulsar-admin.yaml
index 7260e5d02..ef56909cd 100644
--- a/site/_data/cli/pulsar-admin.yaml
+++ b/site/_data/cli/pulsar-admin.yaml
@@ -114,6 +114,193 @@ commands:
- name: update-peer-clusters
description: Update peer cluster names
argument: peer-cluster-names
+- name: functions
+ description: A command-line interface for Pulsar Functions
+ subcommands:
+ - name: localrun
+ description: Run a Pulsar Function locally
+ options:
+ - flags: --brokerServiceUrl
+ description: The URL of the Pulsar broker
+ - flags: --className
+ description: The name of the function's class
+ - flags: --customSerdeInputs
+ description: A map of the input topic to SerDe name
+ - flags: --functionConfigFile
+ description: The path of the YAML config file used to configure the
function
+ - flags: --inputs
+ description: The input topics for the function (as a comma-separated
list if more than one topic is desired)
+ - flags: --logTopic
+ description: The topic to which logs from this function are published
+ - flags: --jar
+ description: A path to the JAR file for the function (if the function is
written in Java)
+ - flags: --name
+ description: The name of the function
+ default: The value specified by `--className`
+ - flags: --namespace
+ description: The function's namespace
+ - flags: --output
+ description: The name of the topic to which the function publishes its
output (if any)
+ - flags: --outputSerdeClassName
+ description: The SerDe class used for the function's output
+ - flags: --parallelism
+ description: The function's parallelism factor, i.e. the number of
instances of the function to run
+ default: 1
+ - flags: --processingGuarantees
+ description: "The processing guarantees applied to the function. Can be
one of: `ATLEAST_ONCE`, `ATMOST_ONCE`, or `EFFECTIVELY_ONCE`"
+ default: ATLEAST_ONCE
+ - flags: --py
+ description: The path of the Python file containing the function's
processing logic (if the function is written in Python)
+ - flags: --stateStorageServiceUrl
+ description: The service URL for the function's state storage (if the
function uses a storage system different from the Apache BookKeeper cluster
used by Pulsar)
+ - flags: --subscriptionType
+ description: The subscription type used by the function when consuming
messages on the input topic(s). Can be either `SHARED` or `EXCLUSIVE`
+ default: SHARED
+ - flags: --tenant
+ description: The function's tenant
+ - flags: --userConfig
+ description: A user-supplied config value, set as a key/value pair. You
can set multiple user config values.
+ - name: create
+ description: Creates a new Pulsar Function on the target infrastructure
+ options:
+ - flags: --className
+ description: The name of the function's class
+ - flags: --customSerdeInputs
+ description: A map of the input topic to SerDe name
+ - flags: --functionConfigFile
+ description: The path of the YAML config file used to configure the
function
+ - flags: --inputs
+ description: The input topics for the function (as a comma-separated
list if more than one topic is desired)
+ - flags: --jar
+ description: A path to the JAR file for the function (if the function is
written in Java)
+ - flags: --name
+ description: The name of the function
+ default: The value specified by `--className`
+ - flags: --namespace
+ description: The function's namespace
+ - flags: --logTopic
+ description: The topic to which logs from this function are published
+ - flags: --output
+ description: The name of the topic to which the function publishes its
output (if any)
+ - flags: --outputSerdeClassName
+ description: The SerDe class used for the function's output
+ - flags: --parallelism
+ description: The function's parallelism factor, i.e. the number of
instances of the function to run
+ default: 1
+ - flags: --processingGuarantees
+ description: "The processing guarantees applied to the function. Can be
one of: `ATLEAST_ONCE`, `ATMOST_ONCE`, or `EFFECTIVELY_ONCE`"
+ default: ATLEAST_ONCE
+ - flags: --py
+ description: The path of the Python file containing the function's
processing logic (if the function is written in Python)
+ - flags: --subscriptionType
+ description: The subscription type used by the function when consuming
messages on the input topic(s). Can be either `SHARED` or `EXCLUSIVE`
+ default: SHARED
+ - flags: --tenant
+ description: The function's tenant
+ - flags: --userConfig
+ description: A user-supplied config value, set as a key/value pair. You
can set multiple user config values.
+ - name: delete
+ description: Deletes an existing Pulsar Function
+ options:
+ - flags: --name
+ description: The name of the function to delete
+ - flags: --namespace
+ description: The namespace of the function to delete
+ - flags: --tenant
+ description: The tenant of the function to delete
+ - name: update
+ description: Updates an existing Pulsar Function
+ options:
+ - flags: --className
+ description: The name of the function's class
+ - flags: --customSerdeInputs
+ description: A map of the input topic to SerDe name
+ - flags: --functionConfigFile
+ description: The path of the YAML config file used to configure the
function
+ - flags: --inputs
+ description: The input topics for the function (as a comma-separated
list if more than one topic is desired)
+ - flags: --jar
+ description: A path to the JAR file for the function (if the function is
written in Java)
+ - flags: --name
+ description: The name of the function
+ default: The value specified by `--className`
+ - flags: --namespace
+ description: The function's namespace
+ - flags: --logTopic
+ description: The topic to which logs from this function are published
+ - flags: --output
+ description: The name of the topic to which the function publishes its
output (if any)
+ - flags: --outputSerdeClassName
+ description: The SerDe class used for the function's output
+ - flags: --parallelism
+ description: The function's parallelism factor, i.e. the number of
instances of the function to run
+ default: 1
+ - flags: --processingGuarantees
+ description: "The processing guarantees applied to the function. Can be
one of: `ATLEAST_ONCE`, `ATMOST_ONCE`, or `EFFECTIVELY_ONCE`"
+ default: ATLEAST_ONCE
+ - flags: --py
+ description: The path of the Python file containing the function's
processing logic (if the function is written in Python)
+ - flags: --subscriptionType
+ description: The subscription type used by the function when consuming
messages on the input topic(s). Can be either `SHARED` or `EXCLUSIVE`
+ default: SHARED
+ - flags: --tenant
+ description: The function's tenant
+ - flags: --userConfig
+ description: A user-supplied config value, set as a key/value pair. You
can set multiple user config values.
+ - name: get
+ description: Fetch information about an existing Pulsar Function
+ options:
+ - flags: --name
+ description: The name of the function about which you want to fetch
information
+ - flags: --namespace
+ description: The namespace of the function about which you want to fetch
information
+ - flags: --tenant
+ description: The tenant of the function about which you want to fetch
information
+ - name: getstatus
+ description: Get the status of an existing Pulsar Function
+ options:
+ - flags: --name
+ description: The name of the function about which you want to fetch
status information
+ - flags: --namespace
+ description: The namespace of the function about which you want to fetch
status information
+ - flags: --tenant
+ description: The tenant of the function about which you want to fetch
status information
+ - name: list
+ description: List all Pulsar Functions for a specific tenant and namespace
+ options:
+ - flags: --namespace
+ description: The namespace of the functions you want to list
+ - flags: --tenant
+ description: The tenant of the functions you want to list
+ - name: querystate
+ description: Retrieve the current state of a Pulsar Function by key
+ options:
+ - flags: -k, --key
+ description: The key for the state you want to fetch
+ - flags: --name
+ description: The name of the function whose state you want to query
+ - flags: --namespace
+ description: The namespace of the function whose state you want to query
+ - flags: --tenant
+ description: The tenant of the function whose state you want to query
+ - flags: -u, --storage-service-url
+ description: The service URL for the function's state storage (if the
function uses a storage system different from the Apache BookKeeper cluster
used by Pulsar)
+ - flags: -w, --watch
+ description: If set, watching for state changes is enabled
+ default: "false"
+ - name: trigger
+ description: Triggers the specified Pulsar Function with a supplied value
or file data
+ options:
+ - flags: --name
+ description: The name of the Pulsar Function to trigger
+ - flags: --namespace
+ description: The namespace of the Pulsar Function to trigger
+ - flags: --tenant
+ description: The tenant of the Pulsar Function to trigger
+ - flags: --triggerFile
+ description: The path to the file containing the data with which the
Pulsar Function is to be triggered
+ - flags: --triggerValue
+ description: The value with which the Pulsar Function is to be triggered
- name: namespaces
description: Operations about namespaces
subcommands:
diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml
index 77df630ca..445b4a6d8 100644
--- a/site/_data/config/broker.yaml
+++ b/site/_data/config/broker.yaml
@@ -18,6 +18,9 @@
#
configs:
+- name: functionsWorkerEnabled
+ description: Whether the Pulsar Functions worker service is enabled in the
broker
+ default: 'false'
- name: zookeeperServers
default: ''
description: Zookeeper quorum connection string
diff --git a/site/_data/features.yaml b/site/_data/features.yaml
index bf04a339f..110eef924 100644
--- a/site/_data/features.yaml
+++ b/site/_data/features.yaml
@@ -17,8 +17,13 @@
# under the License.
#
+- title: Pulsar Functions
+ content: Easily deploy lightweight compute logic using developer-friendly
APIs without needing to run your own stream processing engine
+ endpoint: functions/quickstart
+ new: true
+
- title: Proven in production
- content: Pulsar has run in production at Yahoo scale for over 3 years, with
millions of messages per second across millions of topics.
+ content: Pulsar has run in production at Yahoo scale for over 3 years, with
millions of messages per second across millions of topics
endpoint: getting-started/ConceptsAndArchitecture
- title: Horizontally scalable
@@ -27,11 +32,7 @@
- title: Low latency with durability
content: Designed for low publish latency (< 5ms) at scale with strong
durabilty guarantees
- endpoint: getting-started/ConceptsAndArchitecture/#Architectureoverview
-
-- title: High throughput
- content: Proven in production at Yahoo scale with millions of messages per
second
- endpoint: getting-started/ConceptsAndArchitecture/#Architectureoverview
+ endpoint: getting-started/ConceptsAndArchitecture/#architecture-overview
- title: Geo-replication
content: Designed for configurable replication between data centers across
multiple geographic regions
@@ -45,7 +46,7 @@
- title: Persistent storage
content: |
Persistent message storage based on [Apache
BookKeeper](http://bookkeeper.apache.org/).
- Provides IO level isolation between write and read operations
+ Provides IO-level isolation between write and read operations
endpoint: getting-started/ConceptsAndArchitecture#persistent-storage
- title: Client libraries
diff --git a/site/_data/sidebar.yaml b/site/_data/sidebar.yaml
index 645d2268a..79e1cff80 100644
--- a/site/_data/sidebar.yaml
+++ b/site/_data/sidebar.yaml
@@ -32,19 +32,20 @@ groups:
- title: Pulsar Functions
dir: functions
+ new: true
docs:
- title: Getting started with Pulsar Functions
endpoint: quickstart
#- title: Pulsar Functions overview
# endpoint: overview
- #- title: The Pulsar Functions API
- # endpoint: api
- #- title: Deploying Pulsar Functions
- # endpoint: deployment
- #- title: Processing guarantees
- # endpoint: guarantees
- #- title: Metrics and stats for Pulsar Functions
- # endpoint: metrics-and-stats
+ - title: The Pulsar Functions API
+ endpoint: api
+ - title: Deploying Pulsar Functions
+ endpoint: deployment
+ - title: Processing guarantees
+ endpoint: guarantees
+ - title: Metrics for Pulsar Functions
+ endpoint: metrics
- title: Deployment
dir: deployment
diff --git a/site/_includes/cli.html b/site/_includes/cli.html
index f60ca399d..d6320ff08 100644
--- a/site/_includes/cli.html
+++ b/site/_includes/cli.html
@@ -61,7 +61,7 @@ <h5>Options</h5>
</td>
<td>{{ option.description | markdownify }}</td>
<td>
- {% if option.default %}<code class="highlighter-rouge">{{
option.default }}</code>{% endif %}
+ {% if option.default %}{{ option.default | markdownify }}{%
endif %}
</td>
</tr>
{% endfor %} <!-- for option in options -->
@@ -128,7 +128,7 @@ <h5>Options</h5>
</td>
<td>{{ option.description | markdownify }}</td>
<td>
- {% if option.default %}<code class="highlighter-rouge">{{
option.default }}</code>{% endif %}
+ {% if option.default %}{{ option.default | markdownify }}{%
endif %}
</td>
</tr>
{% endfor %} <!-- for option in options -->
@@ -181,7 +181,7 @@ <h5>Options</h5>
</td>
<td>{{ option.description | markdownify }}</td>
<td>
- {% if option.default %}<code
class="highlighter-rouge">{{ option.default }}</code>{% endif %}
+ {% if option.default %}{{ option.default | markdownify
}}{% endif %}
</td>
</tr>
{% endfor %} <!-- for option in options -->
diff --git a/site/_includes/head.html b/site/_includes/head.html
index b0c69d5fa..445f4027a 100644
--- a/site/_includes/head.html
+++ b/site/_includes/head.html
@@ -18,13 +18,6 @@
under the License.
-->
-<script type="text/javascript">
- var shiftWindow = function() { scrollBy(0, -108) };
- window.addEventListener("hashchange", shiftWindow);
- window.addEventListener("pageshow", shiftWindow);
- function load() { if (window.location.hash) shiftWindow(); }
-</script>
-
<title>{{ page.title }}</title>
<meta charset="utf-8">
diff --git a/site/_includes/sidebar.html b/site/_includes/sidebar.html
index 8dffae4ee..c034ea5ab 100644
--- a/site/_includes/sidebar.html
+++ b/site/_includes/sidebar.html
@@ -28,7 +28,7 @@
<div class="card-header" role="tab" id="heading-{{ slug }}">
<h5>
<a data-toggle="collapse" data-parent="#sidebar-accordion"
href="#collapse-{{ slug }}"{% if false %} aria-exanded="true"{% endif %}
aria-controls="collapse-{{ slug }}">
- {{ group.title }}
+ {{ group.title }}{% if group.new %} <span class="badge
badge-warning">New</span>{% endif %}
</a>
</h5>
</div>
diff --git a/site/_layouts/default.html b/site/_layouts/default.html
index 319a50917..752ea5a8d 100644
--- a/site/_layouts/default.html
+++ b/site/_layouts/default.html
@@ -59,5 +59,13 @@
{% if jekyll.environment == "production" %}
{% include google-analytics.html %}
{% endif %}
+
+ <script type="text/javascript">
+ var navbarOffset = -1 *
(document.getElementsByClassName("navbar")[0].offsetHeight);
+ var shiftWindow = function() { scrollBy(0, navbarOffset) };
+ window.addEventListener("hashchange", shiftWindow);
+ window.addEventListener("pageshow", shiftWindow);
+ function load() { if (window.location.hash) shiftWindow(); }
+ </script>
</body>
</html>
diff --git a/site/_layouts/docs.html b/site/_layouts/docs.html
index dfecfc9dc..008d8995a 100644
--- a/site/_layouts/docs.html
+++ b/site/_layouts/docs.html
@@ -32,7 +32,7 @@
<article class="col-xs-12 col-sm-12 col-md-12 col-lg-7">
<section class="docs-header">
- <h1 class="docs-title">{{ page.title }}</h1>
+ <h1 class="docs-title">{{ page.title }}{% if page.new %} <span
class="badge badge-warning">New</span>{% endif %}</h1>
{% if page.lead %}<span class="docs-lead">{{ page.lead }}</span><br
/>{% endif %}
<section class="tags">
{% for tag in page.tags %}
@@ -44,7 +44,7 @@ <h1 class="docs-title">{{ page.title }}</h1>
{% include version-warning.html %}
- <hr class="hr">
+ <!-- <hr class="hr"> -->
</section>
<section class="content">
diff --git a/site/_plugins/custom_tags.rb b/site/_plugins/custom_tags.rb
index ff82ef3df..961a9f329 100644
--- a/site/_plugins/custom_tags.rb
+++ b/site/_plugins/custom_tags.rb
@@ -37,6 +37,14 @@ def initialize(tag_name, text, tokens)
@term = 'persistent'
end
+ if @term == 'consume'
+ @term = 'consumer'
+ end
+
+ if ['produce', 'producing'].include? @term
+ @term = 'producer'
+ end
+
if @term == 'multi-tenant'
@term = 'multi-tenancy'
end
diff --git a/site/_sass/_admonition.scss b/site/_sass/_admonition.scss
index 30144e688..c889f2512 100644
--- a/site/_sass/_admonition.scss
+++ b/site/_sass/_admonition.scss
@@ -41,6 +41,10 @@
.admonition {
margin: 1rem 0;
+ p + .highlighter-rouge {
+ margin-top: .75rem;
+ }
+
p {
margin: 0;
diff --git a/site/_sass/_docs.scss b/site/_sass/_docs.scss
index c8b3e4df6..42277f070 100644
--- a/site/_sass/_docs.scss
+++ b/site/_sass/_docs.scss
@@ -58,6 +58,10 @@
h1.docs-title {
font-size: 40px;
line-height: 3.2rem;
+
+ .badge {
+ margin-left: .75rem;
+ }
}
span.docs-lead {
@@ -125,7 +129,7 @@
.highlighter-rouge {
font-family: $font-family-monospace;
- margin-bottom: 1rem;
+ margin: .75rem 0;
.highlight {
margin: 0;
diff --git a/site/docs/latest/deployment/Monitoring.md
b/site/docs/latest/deployment/Monitoring.md
index 2e1ae4d74..ea9f6df8b 100644
--- a/site/docs/latest/deployment/Monitoring.md
+++ b/site/docs/latest/deployment/Monitoring.md
@@ -23,15 +23,12 @@ title: Monitoring
-->
-There are different ways to monitor a Pulsar cluster, exposing both metrics
relative to the
-usage of topics and the overall health of the individual components of the
cluster.
+There are different ways to monitor a Pulsar cluster, exposing both metrics
relative to the usage of topics and the overall health of the individual
components of the cluster.
## Collecting metrics
### Broker stats
-The [`pulsar-admin`](../../reference/CliTools#pulsar-admin) tool
-
Pulsar {% popover broker %} metrics can be collected from brokers and exported
in JSON format. There are two main types of metrics:
* *Destination dumps*, which containing stats for each individual topic. They
can be fetched using
diff --git a/site/docs/latest/functions/api.md
b/site/docs/latest/functions/api.md
index 6e7581d7d..1bbf3ab33 100644
--- a/site/docs/latest/functions/api.md
+++ b/site/docs/latest/functions/api.md
@@ -1,49 +1,184 @@
---
title: The Pulsar Functions API
+new: true
---
-Pulsar Functions provides an easy-to-use API that develoeprs can use to
+[Pulsar Functions](../overview) provides an easy-to-use API that developers
can use to create and manage processing logic for the Apache Pulsar messaging
system. With Pulsar Functions, you can write functions of any level of
complexity in [Java](#java) or [Python](#python) and run them in conjunction
with a Pulsar cluster without needing to run a separate stream processing
engine.
+
+{% include admonition.html type="info" content="For a more in-depth overview
of the Pulsar Functions feature, see the [Pulsar Functions
overview](../overview)." %}
## Core programming model
-Pulsar Functions
+Pulsar Functions provide a wide range of functionality but are based on a very
simple programming model. You can think of Pulsar Functions as lightweight
processes that
+
+* {% popover consume %} messages from one or more Pulsar {% popover topics %}
and then
+* apply some user-defined processing logic to each incoming message. That
processing logic could be just about anything you want, including
+ * {% popover producing %} the resulting, processed message on another Pulsar
topic, or
+ * doing something else with the message, such as writing results to an
external database.
+
+You could use Pulsar Functions, for example, to set up the following
processing chain:
+
+* A [Python](#python) function listens on the `raw-sentences` topic and
"[sanitizes](#example-function)" incoming strings (removing extraneous
whitespace and converting all characters to lower case) and then publishes the
results to a `sanitized-sentences` topic
+* A [Java](#java) function listens on the `sanitized-sentences` topic, counts
the number of times each word appears within a specified time window, and
publishes the results to a `results` topic
+* Finally, a Python function listens on the `results` topic and writes the
results to a MySQL table
+
+### Example function
+
+Here's an example "input sanitizer" function written in Python and stored in a
`sanitizer.py` file:
+
+```python
+def clean_string(s):
+ return s.strip().lower()
+
+def process(input):
+ return clean_string(input)
+```
+
+Some things to note about this Pulsar Function:
+
+* There is no client, producer, or consumer object involved. All message
"plumbing" is already taken care of for you, enabling you to worry only about
processing logic.
+* No topics, subscription types, {% popover tenants %}, or {% popover
namespaces %} are specified in the function logic itself. Instead, topics are
specified upon [deployment](#example-deployment). This means that you can use
and re-use Pulsar Functions across topics, tenants, and namespaces without
needing to hard-code those attributes.
+
+### Example deployment
+
+Deploying Pulsar Functions is handled by the
[`pulsar-admin`](../../reference/CliTools#pulsar-admin) CLI tool, in particular
the [`functions`](../../reference/CliTools#pulsar-admin-functions) command.
Here's an example command that would run our [sanitizer](#example-function)
function from above in [local run](../deployment#local-run) mode:
+
+```bash
+$ bin/pulsar-admin functions localrun \
+ --py sanitizer.py \
+ --className sanitizer \
+ --tenant sample \
+ --namespace ns1
+```
+
+For instructions on running functions in your Pulsar cluster, see the
[Deploying Pulsar Functions](../deployment) guide.
+
+### Available APIs {#apis}
+
+In both Java and Python, you have two options for writing Pulsar Functions:
+
+Interface | Description | Use cases
+:---------|:------------|:---------
+Language-native interface | No Pulsar-specific libraries or special
dependencies required (only core libraries from Java/Python) | Functions that
don't require access to the function's [context](#context)
+Pulsar Function SDK for Java/Python | Pulsar-specific libraries that provide a
range of functionality not provided by "native" interfaces | Functions that
require access to the function's [context](#context)
+
+In Python, for example, this language-native function, which adds an
exclamation point to all incoming strings and publishes the resulting string to
a topic, would have no external dependencies:
+
+```python
+def process(input):
+ return "{}!".format(input)
+```
-### Source and sink topics
+This function, however, would use the Pulsar Functions [SDK for
Python](#python-sdk):
+
+```python
+from pulsar import Function
-All Pulsar Functions have one or more **source topics** that supply messages
to the function.
+class DisplayFunctionName(Function):
+ def process(self, input, context):
+ function_name = context.function_name()
+ return "The function processing this message has the name
{0}".format(function_name)
+```
-### Sink topic
+### Serialization and deserialization (SerDe) {#serde}
-At the moment, Pulsar Functions can have at most one **sink topic** to which
processing results are published.
+SerDe stands for **Ser**ialization and **De**serialization. All Pulsar
Functions use SerDe for message handling. How SerDe works by default depends on
the language you're using for a particular function:
-### SerDe
+* In [Python](#python-serde), the default SerDe is identity, meaning that the
type is serialized as whatever type the producer function returns
+* In [Java](#java-serde), a number of commonly used types (`String`s,
`Integer`s, etc.) are supported by default
-SerDe stands for **Ser**ialization and **De**serialization
+In both languages, however, you can write your own custom SerDe logic for more
complex, application-specific types. See the docs for [Java](#java-serde) and
[Python](#python-serde) for language-specific instructions.
-## Context
+### Context
-Both the [Java](#java-functions-with-context) and
[Python](#python-functions-with-context) APIs provide optional access to a
**context object** that can be used by the function. This context object
provides a wide variety of information to the function:
+Both the [Java](#java-sdk) and [Python](#python-sdk) SDKs provide access to a
**context object** that can be used by the function. This context object
provides a wide variety of information and functionality to the function:
* The name and ID of the Pulsar Function
-* The message ID of each message
+* The message ID of each message. Each Pulsar {% popover message %} is
automatically assigned an ID.
* The name of the topic on which the message was sent
-* The names of all [source topics](#source-topics) and the [sink
topics](#sink-topic) associated with the function
+* The names of all input topics as well as the output topic associated with
the function
* The name of the class used for [SerDe](#serde)
* The {% popover tenant %} and {% popover namespace %} associated with the
function
* The ID of the Pulsar Functions instance running the function
* The version of the function
-* The logger object used by the function
+* The [logger object](#logging) used by the function, which can be used to
create function log messages
+* Access to arbitrary [user config](#user-config) values supplied via the CLI
+* An interface for recording [metrics](../metrics)
+
+### User config
+
+When you run or update Pulsar Functions created using the [SDK](#apis), you
can pass arbitrary key/values to them via the command line with the
`--userConfig` flag. Key/values must be specified as JSON. Here's an example of
a function creation command that passes a user config key/value to a function:
+
+```bash
+$ bin/pulsar-admin functions create \
+ --name word-filter \
+ # Other function configs
+ --userConfig '{"forbidden-word":"rosebud"}'
+```
+
+If the function were a Python function, that config value could be accessed
like this:
+
+```python
+from pulsar import Function
+
+class WordFilter(Function):
+ def process(self, context, input):
+ forbidden_word = context.user_config()["forbidden-word"]
+
+ # Don't publish the message if it contains the user-supplied
+ # forbidden word
+ if forbidden_word in input:
+ pass
+ # Otherwise publish the message
+ else:
+ return input
+```
-## Java
+## Pulsar Functions for Java {#java}
Writing Pulsar Functions in Java involves implementing one of two interfaces:
* The
[`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html)
interface
-* The {% javadoc PulsarFunction client
org.apache.pulsar.functions.api.Function %} interface. This interface works
much like the `java.util.Function` ihterface, but with the important difference
+* The {% javadoc Function client org.apache.pulsar.functions.api.Function %}
interface. This interface works much like the `java.util.Function` ihterface,
but with the important difference that it provides a {% javadoc Context client
org.apache.pulsar.functions.api.Context %} object that you can use in a
[variety of ways](#context)
+
+### Getting started
+
+In order to write Pulsar Functions in Java, you'll need to install the proper
[dependencies](#java-dependencies) and package your function [as a
JAR](#java-packaging).
+
+#### Dependencies {#java-dependencies}
+
+How you get started writing Pulsar Functions in Java depends on which API
you're using:
+
+* If you're writing [Java native function](#java-native), you won't need any
external dependencies.
+* If you're writing a [Java SDK](#java-sdk) function, you'll need to import
the `pulsar-functions-api` library.
+
+ Here's an example for a Maven `pom.xml` configuration file:
+
+ ```xml
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-api</artifactId>
+ <version>2.0.0-incubating-SNAPSHOT</version>
+ </dependency>
+ ```
+
+ Here's an example for a Gradle `build.gradle` configuration file:
+
+ ```groovy
+ dependencies {
+ compile group: 'org.apache.pulsar', name: 'pulsar-functions-api', version:
'2.0.0-incubating-SNAPSHOT'
+ }
+ ```
+
+#### Packaging {#java-packaging}
+
+Whether you're writing Java Pulsar Functions using the [native](#java-native)
Java `java.util.Function` interface or using the [Java SDK](#java-sdk), you'll
need to package your function(s) as a "fat" JAR.
-### Java functions without context
+{% include admonition.html type="success" title="Starter repo" content="If
you'd like to get up and running quickly, you can use [this
repo](https://github.com/streamlio/pulsar-functions-java-starter), which
contains the necessary Maven configuration to build a fat JAR as well as some
example functions." %}
-If your function doesn't require access to its [context](#context), you can
implement the
[`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html)
interface, which has this very simply, one-method signature:
+### Java native functions {#java-native}
+
+If your function doesn't require access to its [context](#context), you can
create a Pulsar Function by implementing the
[`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html)
interface, which has this very simple, single-method signature:
```java
public interface Function<I, O> {
@@ -51,39 +186,30 @@ public interface Function<I, O> {
}
```
-Here's a simple example that takes a string as its input, adds an exclamation
point to the end of the string, and then publishes the resulting string:
+Here's an example function that takes a string as its input, adds an
exclamation point to the end of the string, and then publishes the resulting
string:
```java
import java.util.Function;
public class ExclamationFunction implements Function<String, String> {
-
+ @Override
+ public String process(String input) {
+ return String.format("%s!", input);
+ }
}
```
-### Void functions
-
-Pulsar Functions can publish results to an output {% popover topic %}, but
this isn't required. Functions can also
-
+In general, you should use native functions when you don't need access to the
function's [context](#context). If you *do* need access to the function's
context, then we recommend using the [Pulsar Functions Java SDK](#java-sdk).
+### Java SDK functions {#java-sdk}
-```java
-import java.util.Function;
-public class ExclamationFunction implements Function<String, String> {
- @Override
- public String apply(String input) { return String.format("%s!", input); }
-}
-```
+To get started developing Pulsar Functions using the Java SDK, you'll need to
add a dependency on the `pulsar-functions-api` artifact to your project.
Instructions can be found [above](#java-dependencies).
-### Java functions with context
+{% include admonition.html type='success' content='An easy way to get up and
running with Pulsar Functions in Java is to clone the
[`pulsar-functions-java-starter`](https://github.com/streamlio/pulsar-functions-java-starter)
repo and follow the instructions there.' %}
-```java
-public interface PulsarFunction<I, O> {
- O process(I input, Context context) throws Exception;
-}
-```
+### Java context object {#java-context}
-Context interface:
+The {% javadoc Context client org.apache.pulsar.functions.api.Context %}
interface provides a number of methods that you can use to access the functions
[context](#context). The various method signatures for the `Context` interface
are listed below:
```java
public interface Context {
@@ -99,7 +225,6 @@ public interface Context {
String getInstanceId();
String getFunctionVersion();
Logger getLogger();
- void incrCounter(String key, long amount);
String getUserConfigValue(String key);
void recordMetric(String metricName, double value);
<O> CompletableFuture<Void> publish(String topicName, O object, String
serDeClassName);
@@ -108,9 +233,66 @@ public interface Context {
}
```
-### SerDe
+Here's an example function that uses several methods available via the
`Context` object:
-> Serde stands for **Ser**ialization and **De**serialization.
+```java
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.slf4j.Logger;
+
+import java.util.stream.Collectors;
+
+public class ContextFunction implements Function<String, Void> {
+ @Override
+ public Void process(String input, Context context) {
+ Logger LOG = context.getLogger();
+ String inputTopics =
context.getInputTopics().stream().collect(Collectors.joining(", "));
+ String functionName = context.getFunctionName();
+
+ String logMessage = String.format("A message with a value of \"%s\"
has arrived on one of the following topics: %s\n",
+ input,
+ inputTopics);
+
+ LOG.info(logMessage);
+
+ String metricName = String.format("function-%s-messages-received",
functionName);
+ context.recordMetric(metricName, 1);
+
+ return null;
+ }
+}
+```
+
+### Void functions
+
+Pulsar Functions can publish results to an output {% popover topic %}, but
this isn't required. You can also have functions that simply produce a log,
write results to a database, etc. Here's a function that writes a simple log
every time a message is received:
+
+```java
+import org.slf4j.Logger;
+
+public class LogFunction implements PulsarFunction<String, Void> {
+ @Override
+ public String apply(String input, Context context) {
+ Logger LOG = context.getLogger();
+ LOG.info("The following message was received: {}", input);
+ return null;
+ }
+}
+```
+
+{% include admonition.html type="warning" content="When using Java functions
in which the output type is `Void`, the function must *always* return `null`."
%}
+
+### Java SerDe
+
+Pulsar Functions use [SerDe](#serde) when publishing data to and consuming
data from Pulsar topics. When you're writing Pulsar Functions in Java, the
following basic Java types are built in and supported by default:
+
+* `String`
+* `Double`
+* `Integer`
+* `Float`
+* `Long`
+* `Short`
+* `Byte`
Built-in vs. custom. For custom, you need to implement this interface:
@@ -121,23 +303,133 @@ public interface SerDe<T> {
}
```
-The built-in is the `org.apache.pulsar.functions.api.DefaultSerDe` class:
+#### Java SerDe example
+
+Imagine that you're writing Pulsar Functions in Java that are processing tweet
objects. Here's a simple example `Tweet` class:
```java
+public class Tweet {
+ private String username;
+ private String tweetContent;
+
+ public Tweet(String username, String tweetContent) {
+ this.username = username;
+ this.tweetContent = tweetContent;
+ }
+ // Standard setters and getters
+}
```
-The built-in should work fine for basic Java types. For more advanced types,
+In order to be able to pass `Tweet` objects directly between Pulsar Functions,
you'll need to provide a custom SerDe class. In the example below, `Tweet`
objects are basically strings in which the username and tweet content are
separated by a `|`.
+```java
+package com.example.serde;
-## Python
+import org.apache.pulsar.functions.api.SerDe;
-```python
-def process(input):
+import java.util.regex.Pattern;
+
+public class TweetSerde implements SerDe<Tweet> {
+ public Tweet deserialize(byte[] input) {
+ String s = new String(input);
+ String[] fields = s.split(Pattern.quote("|"));
+ return new Tweet(fields[0], fields[1]);
+ }
+
+ public byte[] serialize(Tweet input) {
+ return "%s|%s".format(input.getUsername(),
input.getTweetContent()).getBytes();
+ }
+}
```
-### With context
+To apply this custom SerDe to a particular Pulsar Function, you would need to:
-```python
-def
-```
\ No newline at end of file
+* Package the `Tweet` and `TweetSerde` classes into a JAR
+* Specify a path to the JAR and SerDe class name when deploying the function
+
+Here's an example
[`create`](../../reference/CliTools#pulsar-admin-functions-create) operation:
+
+```bash
+$ bin/pulsar-admin functions create \
+ --jar /path/to/your.jar \
+ --outputSerdeClassName com.example.serde.TweetSerde \
+ # Other function attributes
+```
+
+{% include admonition.html type="warning" title="Custom SerDe classes must be
packaged with your function JARs" content="
+Pulsar does not store your custom SerDe classes separately from your Pulsar
Functions. That means that you'll need to always include your SerDe classes in
your function JARs. If not, Pulsar will return an error." %}
+
+### Java logging
+
+Pulsar Functions that use the [Java SDK](#java-sdk) have access to an
[SLF4j](https://www.slf4j.org/)
[`Logger`](https://www.slf4j.org/api/org/apache/log4j/Logger.html) object that
can be used to create logs. Here's a simple example function that logs either a
`WARNING`- or `INFO`-level log based on whether the incoming string contains
the word `danger`:
+
+```java
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.slf4j.Logger;
+
+public class LoggingFunction implements Function<String, Void> {
+ @Override
+ public void apply(String input, Context context) {
+ Logger LOG = context.getLogger();
+ String messageId = new String(context.getMessageId());
+
+ if (input.contains("danger")) {
+ LOG.warn("A warning was received in message {}", messageId);
+ } else {
+ LOG.info("Message {} received\nContent: {}", messageId, input);
+ }
+
+ return null;
+ }
+}
+```
+
+If you want your function to produce logs, you need to specify a log topic
when creating or running the function. Here's an example:
+
+```bash
+$ bin/pulsar-admin functions create \
+ --jar my-functions.jar \
+ --className my.package.LoggingFunction \
+ --logTopic persistent://sample/standalone/ns1/logging-function-logs \
+ # Other function configs
+```
+
+Now, all logs produced by the `LoggingFunction` above can be accessed via the
`persistent://sample/standalone/ns1/logging-function-logs` topic.
+
+### Java user config
+
+The Java SDK's [`Context`](#java-context) object enables you to access
key/value pairs provided to the Pulsar Function via the command line (as JSON).
Here's an example function creation command that passes a key/value pair:
+
+```bash
+$ bin/pulsar-admin functions create \
+ # Other function configs
+ --userConfig '{"word-of-the-day":"verdure"}'
+```
+
+To access that value in a Java function:
+
+```java
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.slf4j.Logger;
+
+public class UserConfigFunction implements Function<String, Void> {
+ @Override
+ public void apply(String input, Context context) {
+ Logger LOG = context.getLogger();
+ String wotd = context.getUserConfigValue("word-of-the-day");
+ LOG.info("The word of the day is {}", wotd);
+ return null;
+ }
+}
+```
+
+The `UserConfigFunction` function will log the string `"The word of the day is
verdure"` every time the function is invoked (i.e. every time a message
arrives). The `word-of-the-day` user config will be changed only when the
function is updated with a new config value.
+
+{% include admonition.html type="info" content="For all key/value pairs passed
to Java Pulsar Functions, both the key *and* the value are `String`s. If you'd
like the value to be of a different type, you will need to deserialize from the
`String` type." %}
+
+## Pulsar Functions for Python {#python}
+
+Documentation for the Python SDK for Pulsar Functions is coming soon.
\ No newline at end of file
diff --git a/site/docs/latest/functions/deployment.md
b/site/docs/latest/functions/deployment.md
index ea2d79a97..23dc35ede 100644
--- a/site/docs/latest/functions/deployment.md
+++ b/site/docs/latest/functions/deployment.md
@@ -1,9 +1,173 @@
---
-title: Deploying Pulsar Functions
+title: Deploying and managing Pulsar Functions
+lead: Something or other
+new: true
---
-At the moment, Pulsar Functions are deployed
+At the moment, there are two deployment modes available for Pulsar Functions:
-## State storage
+Mode | Description
+:----|:-----------
+Local run mode | The function runs in your local environment, for example on
your laptop
+Cluster mode | The function runs *inside of* your Pulsar cluster, on the same
machines as your Pulsar {% popover brokers %}
-By default, Pulsar uses [Apache BookKeeper](https://bookkeeper.apache.org).
\ No newline at end of file
+{% include admonition.html type="info" title="Contributing new deployment
modes" content="The Pulsar Functions feature was designed, however, with
extensibility in mind. Other deployment options will be available in the
future. If you'd like to add a new deployment option, we recommend getting in
touch with the Pulsar developer community at
[[email protected]](mailto:[email protected])." %}
+
+## Requirements
+
+In order to deploy and manage Pulsar Functions, you need to have a Pulsar {%
popover cluster %} running. There are several options for this:
+
+* You can run a [standalone cluster](../../getting-started/LocalCluster)
locally on your own machine
+* You can deploy a Pulsar cluster on
[Kubernetes](../../deployment/Kubernetes), [Amazon Web
Services](../../deployment/aws-cluster), [bare
metal](../../deployment/instance), [DC/OS](../../deployment/dcos), and more
+
+If you're running a non-{% popover standalone %} cluster, you'll need to
obtain the service URL for the cluster. How you obtain the service URL will
depend on how you deployed your Pulsar cluster.
+
+## Local run mode {#local-run}
+
+If you run a Pulsar Function in **local run** mode, it will run on the machine
from which the command is run (this could be your laptop, an [AWS
EC2](https://aws.amazon.com/ec2/) instance, etc.). Here's an example
[`localrun`](../../CliTools#pulsar-admin-functions-localrun) command:
+
+```bash
+$ bin/pulsar-admin functions localrun \
+ --py myfunc.py \
+ --className myfunc.SomeFunction \
+ --inputs persistent://sample/standalone/ns1/input-1 \
+ --output persistent://sample/standalone/ns1/output-1
+```
+
+By default, the function will connect to a Pulsar cluster running on the same
machine, via a local {% popover broker %} service URL of
`pulsar://localhost:6650`. If you'd like to use local run mode to run a
function but connect it to a non-local Pulsar cluster, you can specify a
different broker URL using the `--brokerServiceUrl` flag. Here's an example:
+
+```bash
+$ bin/pulsar-admin functions localrun \
+ --brokerServiceUrl pulsar://my-cluster-host:6650 \
+ # Other function parameters
+```
+
+## Cluster mode
+
+When you run a Pulsar Function in **cluster mode**, the function code will be
uploaded to a Pulsar {% popover broker %} and run *alongside the broker* rather
than in your [local environment](#local-run). You can run a function in cluster
mode using the [`create`](../../CliTools#pulsar-admin-functions-create)
command. Here's an example:
+
+```bash
+$ bin/pulsar-admin functions create \
+ --py myfunc.py \
+ --className myfunc.SomeFunction \
+ --inputs persistent://sample/standalone/ns1/input-1 \
+ --output persistent://sample/standalone/ns1/output-1
+```
+
+### Updating cluster mode functions {#updating}
+
+You can use the [`update`](../../CliTools#pulsar-admin-functions-update)
command to update a Pulsar Function running in cluster mode. This command, for
example, would update the function created in the section
[above](#cluster-mode):
+
+```bash
+$ bin/pulsar-admin functions update \
+ --py myfunc.py \
+ --className myfunc.SomeFunction \
+ --inputs persistent://sample/standalone/ns1/new-input-topic \
+ --output persistent://sample/standalone/ns1/new-output-topic
+```
+
+{% include admonition.html type="info" content="Something" %}
+
+### Parallelism
+
+Pulsar Functions run as processes called **instances**. When you run a Pulsar
Function, it runs as a single instance by default (and in [local run
mode](#local-run) you can *only* run a single instance of a function).
+
+You can also specify the *parallelism* of a function, i.e. the number of
instances to run, when you create the function. You can set the parallelism
factor using the `--parallelism` flag of the
[`create`](../../references/CliTools#pulsar-admin-functions-create) command.
Here's an example:
+
+```bash
+$ bin/pulsar-admin functions create \
+ --parallelism 3 \
+ # Other function info
+```
+
+You can adjust the parallelism of an already created function using the
[`update`](../../reference/CliTools#pulsar-admin-functions-update) interface.
+
+```bash
+$ bin/pulsar-admin functions update \
+ --parallelism 5 \
+ # Other function
+```
+
+If you're specifying a function's configuration via YAML, use the
`parallelism` parameter. Here's an example config file:
+
+```yaml
+# function-config.yaml
+parallelism: 3
+inputs:
+- persistent://sample/standalone/ns1/input-1
+output: persistent://sample/standalone/ns1/output-1
+# other parameters
+```
+
+And here's the corresponding update command:
+
+```bash
+$ bin/pulsar-admin functions update \
+ --functionConfigFile function-config.yaml
+```
+
+## Triggering functions
+
+Whether a Pulsar Function is running in [local run](#local-run) or
[cluster](#cluster-mode), you can **trigger** the function at any time using
the command line. Triggering a function means that you send a message with a
specific value to the function.
+
+{% include admonition.html type="info" content="Triggering a function is
ultimately no different from invoking a function by producing a message on one
of the function's input topics. The [`pulsar-admin functions
trigger`](../../CliTools#pulsar-admin-functions-trigger) command is essentially
a convenient mechanism for sending messages to functions without needing to use
the [`pulsar-client`](../../CliTools#pulsar-client) tool or a language-specific
client library." %}
+
+To show an example of function triggering, let's start with a simple [Python
function](../api#python) that returns a simple string based on the input:
+
+```python
+# myfunc.py
+def process(input):
+ return "This function has been triggered with a value of {0}".format(input)
+```
+
+Let's run that function in [local run mode](../deployment#local-run):
+
+```bash
+$ bin/pulsar-admin functions create \
+ --tenant sample \
+ --namespace ns1 \
+ --name myfunc \
+ --py myfunc.py \
+ --className myfunc \
+ --inputs persistent://sample/standalone/ns1/in \
+ --output persistent://sample/standalone/ns1/out
+```
+
+Now let's make a consumer listen on the output topic for messages coming from
the `myfunc` function using the [`pulsar-client
consume`](../../CliTools#pulsar-client-consume) command:
+
+```bash
+$ bin/pulsar-client consume persistent://sample/standalone/ns1/out \
+ --subscription-name my-subscription
+ --num-messages 0 # Listen indefinitely
+```
+
+Now let's trigger that function:
+
+```bash
+$ bin/pulsar-admin functions trigger \
+ --tenant sample \
+ --namespace ns1 \
+ --name myfunc \
+ --triggerValue "hello world"
+```
+
+The consumer listening on the output topic should then produce this in its
logs:
+
+```
+----- got message -----
+This function has been triggered with a value of hello world
+```
+
+{% include admonition.html type="success" title="Topic info not required"
content="In the `trigger` command above, you may have noticed that you only
need to specify basic information about the function (tenant, namespace, and
name). To trigger the function, you didn't need to know the function's input
topic(s)." %}
+
+<!--
+## Subscription types
+
+Pulsar supports three different [subscription
types](../../getting-started/ConceptsAndArchitecture#subscription-modes) (or
subscription modes) for Pulsar clients:
+
+* With [exclusive](../../getting-started/ConceptsAndArchitecture#exclusive)
subscriptions, only a single {% popover consumer %} is allowed to attach to the
subscription.
+* With [shared](../../getting-started/ConceptsAndArchitecture#shared) . Please
note that strict message ordering is *not* guaranteed with shared subscriptions.
+* With [failover](../../getting-started/ConceptsAndArchitecture#failover)
subscriptions
+
+Pulsar Functions can also be assigned a subscription type when you
[create](#cluster-mode) them or run them [locally](#local-run). In cluster
mode, the subscription can also be [updated](#updating) after the function has
been created.
+-->
\ No newline at end of file
diff --git a/site/docs/latest/functions/guarantees.md
b/site/docs/latest/functions/guarantees.md
index 3efd5494f..54a4635c4 100644
--- a/site/docs/latest/functions/guarantees.md
+++ b/site/docs/latest/functions/guarantees.md
@@ -1,22 +1,25 @@
---
title: Processing guarantees
lead: Apply at-most-once, at-least-once, or effectively-once delivery
semantics to Pulsar Functions
+new: true
---
-Pulsar Functions provides three different messaging semantics that you can
apply to any Function:
+Pulsar Functions provides three different messaging semantics that you can
apply to any function:
-* **At-most-once** delivery
-* **At-least-once** delivery
-* **Effectively-once** delivery
+Delivery semantics | Description
+:------------------|:-------
+**At-most-once** delivery | Each message that is sent to the function will
most likely be processed but also may not be (hence the "at most")
+**At-least-once** delivery | Each message that is sent to the function could
be processed more than once (hence the "at least")
+**Effectively-once** delivery | Each message that is sent to the function will
have one output associated with it. The function may be invoked more than once,
perhaps due to some kind of system failure, but the function will produce one
effect for each incoming message.
-## How it works
+## Applying processing guarantees to a function
-You can set the processing guarantees for a Pulsar Function when you create
the Function. This [`pulsar-function
create`](../../reference/CliTools#pulsar-functions-create) command, for
example, would apply effectively-once guarantees to the Function:
+You can set the processing guarantees for a Pulsar Function when you create
the Function. This [`pulsar-function
create`](../../reference/CliTools#pulsar-admin-functions-create) command, for
example, would apply effectively-once guarantees to the Function:
```bash
$ bin/pulsar-functions \
- # TODO
- --processingGuarantees EFFECTIVELY_ONCE
+ --processingGuarantees EFFECTIVELY_ONCE \
+ # Other function configs
```
The available options are:
@@ -25,4 +28,8 @@ The available options are:
* `ATLEAST_ONCE`
* `EFFECTIVELY_ONCE`
-{% include admonition.html type='info' content='By default, Pulsar Functions
provide at-most-once delivery guarantees. If you create a function without
supplying a value for the `--processingGuarantees`flag, then the Function will
provide only at-most-once guarantees.' %}
\ No newline at end of file
+{% include admonition.html type="info" content="By default, Pulsar Functions
provide at-most-once delivery guarantees. So if you create a function without
supplying a value for the `--processingGuarantees` flag, then the function will
provide at-most-once guarantees." %}
+
+## Updating the processing guarantees of a function
+
+You can change the processing guarantees applied to a function once it's
already been created using the
[`update`](../../reference/CliTools#pulsar-admin-functions-update) command.
\ No newline at end of file
diff --git a/site/docs/latest/functions/metrics-and-stats.md
b/site/docs/latest/functions/metrics-and-stats.md
deleted file mode 100644
index 90f330657..000000000
--- a/site/docs/latest/functions/metrics-and-stats.md
+++ /dev/null
@@ -1,19 +0,0 @@
----
-title: Metrics and stats for Pulsar Functions
----
-
-Pulsar Functions can publish arbitrary metrics to the metrics interface (which
can then be queried).
-
-## Java API
-
-To publish a metric to the metrics interface:
-
-```java
-void recordMetric(String metricName, double value);
-```
-
-Here's an example:
-
-```java
-Context.recordMetric("my-custom-metrics", 475);
-```
\ No newline at end of file
diff --git a/site/docs/latest/functions/metrics.md
b/site/docs/latest/functions/metrics.md
new file mode 100644
index 000000000..496940f27
--- /dev/null
+++ b/site/docs/latest/functions/metrics.md
@@ -0,0 +1,41 @@
+---
+title: Metrics for Pulsar Functions
+new: true
+---
+
+Pulsar Functions can publish arbitrary metrics to the metrics interface which
can then be queried. This doc contains instructions for publishing metrics
using the [Java](#java-sdk) and [Python](#python-sdk) Pulsar Functions SDKs.
+
+{% include admonition.html type="warning" title="Metrics and stats not
available through language-native interfaces" content="If a Pulsar Function
uses the language-native interface for [Java](../api#java-native) or
[Python](#python-native), that function will not be able to publish metrics and
stats to Pulsar." %}
+
+## Accessing metrics
+
+For a guide to accessing metrics created by Pulsar Functions, see the guide to
[Monitoring](../../deployment/Monitoring) in Pulsar.
+
+## Java SDK
+
+If you're creating a Pulsar Function using the [Java SDK](../api#java-sdk),
the {% javadoc Context client org.apache.pulsar.functions.api.Context %} object
has a `recordMetric` method that you can use to register both a name for the
metric and a value. Here's the signature for that method:
+
+```java
+void recordMetric(String metricName, double value);
+```
+
+Here's an example function:
+
+```java
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+public class MetricRecordingFunction implements Function<String, Void> {
+ @Override
+ public void apply(String input, Context context) {
+ context.recordMetric("number-of-characters", input.length());
+ return null;
+ }
+}
+```
+
+This function counts the length of each incoming message (of type `String`)
and then registers that under the `number-of-characters` metric.
+
+## Python SDK
+
+Documentation for the [Python SDK](../api#python-sdk) is coming soon.
\ No newline at end of file
diff --git a/site/docs/latest/functions/overview.md
b/site/docs/latest/functions/overview.md
index 6dff8d478..e1cfbab3f 100644
--- a/site/docs/latest/functions/overview.md
+++ b/site/docs/latest/functions/overview.md
@@ -1,6 +1,7 @@
---
title: Pulsar Functions overview
lead: A bird's-eye look at Pulsar's lightweight, developer-friendly compute
platform
+new: true
---
@@ -25,7 +26,7 @@ Functions are executed each time a message is published to
the input topic. If a
> Pulsar features automatic message deduplication
-### Goals
+## Goals
Core goal: make Pulsar do real heavy lifting without needing to deploy a
neighboring system (Storm, Heron, Flink, etc.). Ready-made compute
infrastructure at your disposal.
@@ -70,6 +71,13 @@ Pulsar Functions can currently be written in
[Java](../../functions/api#java) an
### State storage
+Although you can certainly use Pulsar Functions to perform stateless
computations, many use cases demand robust state storage
+
+
+You can certainly use Pulsar Functions to perform stateless operations,
+
+By default, Pulsar Functions use [Apache
BookKeeper](https://bookkeeper.apache.org) for state storage.
+
### Metrics
Here's an example function that publishes a value of 1 to the `my-metric`
metric.
@@ -89,3 +97,6 @@ public class MetricsFunction implements
PulsarFunction<String, Void> {
### Data types
* Strongly typed
+
+## YAML configuration {#yaml}
+
diff --git a/site/docs/latest/functions/quickstart.md
b/site/docs/latest/functions/quickstart.md
index 7a0143b24..54f5ae761 100644
--- a/site/docs/latest/functions/quickstart.md
+++ b/site/docs/latest/functions/quickstart.md
@@ -1,6 +1,7 @@
---
title: Getting started with Pulsar Functions
lead: Write and run your first Pulsar Function in just a few steps
+new: true
---
This tutorial will walk you through running a {% popover standalone %} Pulsar
{% popover cluster %} on your machine and then running your first Pulsar
Functions using that cluster. The first function will run in local run mode
(outside your Pulsar {% popover cluster %}), while the second will run in
cluster mode (inside your cluster).
@@ -16,9 +17,9 @@ In order to follow along with this tutorial, you'll need to
have [Maven](https:/
In order to run our Pulsar Functions, we'll need to run a Pulsar cluster
locally first. The easiest way to do that is to run Pulsar in {% popover
standalone %} mode. Follow these steps to start up a standalone cluster:
```bash
-$ wget
https://github.com/streamlio/incubator-pulsar/releases/download/2.0.0-incubating-functions-preview/apache-pulsar-2.0.0-incubating-functions-preview-bin.tar.gz
-$ tar xvf apache-pulsar-2.0.0-incubating-functions-preview-bin.tar.gz
-$ cd apache-pulsar-2.0.0-incubating-functions-preview
+$ wget https://github.com/streamlio/incubator-pulsar/releases/download/v{{
site.preview_version }}/apache-pulsar-{{ site.preview_version }}-bin.tar.gz
+$ tar xvf apache-pulsar-{{ site.preview_version }}-bin.tar.gz
+$ cd apache-pulsar-{{ site.preview_version }}
$ bin/pulsar standalone \
--advertised-address 127.0.0.1
```
@@ -53,6 +54,14 @@ $ bin/pulsar-admin functions localrun \
--name exclamation
```
+{% include admonition.html title='Multiple input topics allowed'
type='success' content="
+In the example above, a single topic was specified using the `--inputs` flag.
You can also specify multiple input topics as a comma-separated list using the
same flag. Here's an example:
+
+```bash
+--inputs topic1,topic2
+```
+" %}
+
We can use the [`pulsar-client`](../../reference/CliTools#pulsar-client) CLI
tool to publish a message to the input topic:
```bash
@@ -222,34 +231,36 @@ Here, the `process` method defines the processing logic
of the Pulsar Function.
$ bin/pulsar-admin functions create \
--py reverse.py \
--className reverse \
- --inputs persistent://sample/standalone/ns1/input \
- --output persistent://sample/standalone/ns1/output \
+ --inputs persistent://sample/standalone/ns1/backwards \
+ --output persistent://sample/standalone/ns1/forwards \
--tenant sample \
--namespace ns1 \
- --name reverse
+ --name reverse
```
-If you see `Created successfully`, the function is ready to accept incoming
messages. Let's publish a string to the input topic:
+If you see `Created successfully`, the function is ready to accept incoming
messages. Let's listen for incoming messages on the output topic using the
[`pulsar-client consume`](../../CliTools#pulsar-client-consume) command:
```bash
-$ bin/pulsar-client produce persistent://sample/standalone/ns1/input \
- --num-produce 1 \
- --messages "sdrawrof won si tub sdrawkcab saw gnirts sihT"
+$ bin/pulsar-client consume persistent://sample/standalone/ns1/backwards \
+ --subscription-name my-subscription \
+ --num-messages 0
```
-Now, let's pull in a message from the output topic:
+{% include admonition.html type="info" content="Setting the `--num-messages`
flag to 0 means that the consumer will listen on the topic indefinitely." %}
+
+At the moment, no messages are arriving on the topic, so let's produce some,
also using [`pulsar-client produce`](../../CliTools#pulsar-client-produce)
command:
```bash
-$ bin/pulsar-client consume persistent://sample/standalone/ns1/output \
- --subscription-name my-subscription \
- --num-messages 1
+$ bin/pulsar-client produce persistent://sample/standalone/ns1/forwards \
+ --num-produce 1 \
+ --messages "sdrawrof won si tub sdrawkcab saw gnirts sihT"
```
-You should see the reversed string in the log output:
+You should see the reversed string in the output:
```
----- got message -----
This string was backwards but is now forwards
```
-Once again, success! We created a brand new Pulsar Function, deployed it in
our Pulsar standalone cluster, and successfully published to the function's
input topic and consumed from its output topic.
+Once again, success! We created a brand new Pulsar Function, deployed it in
our Pulsar standalone cluster, successfully published to the function's input
topic, and finally consumed from its output topic.
diff --git a/site/index.html b/site/index.html
index 3b1eba960..59177f73e 100644
--- a/site/index.html
+++ b/site/index.html
@@ -48,9 +48,12 @@ <h2 class="text-center">Key features</h2>
{% for feature in site.data.features %}
<div class="col-sm-4 card">
<div class="card-block">
- <a href="/docs/latest/{{ feature.endpoint }}" >
- <h3 class="card-title">{{ feature.title }}</h3>
- </a>
+ <h3 class="card-title">
+ <a href="/docs/latest/{{ feature.endpoint }}">
+ {{ feature.title }}
+ </a>
+ {% if feature.new %} <span class="badge
badge-warning">New</span>{% endif %}
+ </h3>
<p>{{ feature.content | markdownify }}</p>
</div>
</div>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services