sijie closed pull request #1791: [WIP] Pulsar Functions windowing documentation
URL: https://github.com/apache/incubator-pulsar/pull/1791
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index dc311bafaa..accc15c863 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -239,7 +239,7 @@ void processArguments() throws Exception {
protected Integer slidingIntervalCount;
@Parameter(names = "--slidingIntervalDurationMs", description = "")
protected Long slidingIntervalDurationMs;
- @Parameter(names = "--autoAck", description = "")
+ @Parameter(names = "--autoAck", description = "Whether the function
will automatically acknowledge (ack) each message upon receipt")
protected Boolean autoAck;
protected FunctionConfig functionConfig;
protected String userCodeFile;
diff --git
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
index 189b2ca103..94c156381b 100644
---
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
+++
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
@@ -25,16 +25,9 @@
import java.util.function.Function;
@Slf4j
-public class WindowFunction implements Function <Collection<Integer>, Integer>
{
+public class WindowFunction implements Function<Collection<Integer>, Integer> {
@Override
public Integer apply(Collection<Integer> integers) {
-
- int sum = integers.stream().reduce(new BinaryOperator<Integer>() {
- @Override
- public Integer apply(Integer integer, Integer integer2) {
- return integer + integer2;
- }
- }).get();
- return sum;
+ return integers.stream().reduce(0, (x, y) -> x + y);
}
}
diff --git a/site/_data/cli/pulsar-admin.yaml b/site/_data/cli/pulsar-admin.yaml
index 21806199f9..c09ca6170f 100644
--- a/site/_data/cli/pulsar-admin.yaml
+++ b/site/_data/cli/pulsar-admin.yaml
@@ -160,6 +160,17 @@ commands:
description: The function's tenant
- flags: --userConfig
description: A user-supplied config value, set as a key/value pair. You
can set multiple user config values.
+ - flags: --autoAck
+ description: Whether the function will automatically acknowledge (ack)
each message upon receipt
+ default: true
+ - flags: --windowLengthCount
+ desription: TODO
+ - flags: --windowLengthDurationMs
+ description: TODO
+ - flags: --slidingIntervalCount
+ description: TODO
+ - flags: --slidingIntervalDurationMs
+ description: TODO
- name: create
description: Creates a new Pulsar Function on the target infrastructure
options:
diff --git a/site/docs/latest/functions/deployment.md
b/site/docs/latest/functions/deployment.md
index 3dbe6dd5ec..f0986615e1 100644
--- a/site/docs/latest/functions/deployment.md
+++ b/site/docs/latest/functions/deployment.md
@@ -208,3 +208,27 @@ Pulsar supports three different [subscription
types](../../getting-started/Conce
Pulsar Functions can also be assigned a subscription type when you
[create](#cluster-mode) them or run them [locally](#local-run). In cluster
mode, the subscription can also be [updated](#updating) after the function has
been created.
-->
+
+## Window functions
+
+When running Pulsar Functions in either [cluster](#cluster-mode) or [local
run](#local-run) mode, you can create [**window
functions**](../overview#windows) that process collections of messages
accumulated over a specified time window.
+
+This CLI command, for example, would run a window
+
+```bash
+$ bin/pulsar-admin functions create \
+ --jar my-functions.jar \
+ --className org.example.WordCountFunction \
+ --windowLengthCount 100 \
+
+```
+
+
+
+Flag | Meaning | Default
+:----|:--------|:-------
+`--windowLengthCount` | |
+`--windowLengthDurationMs` | |
+`--slidingIntervalCount` | |
+`--slidingIntervalDurationMs` | |
+
diff --git a/site/docs/latest/functions/overview.md
b/site/docs/latest/functions/overview.md
index a904f817a8..0172b44a0a 100644
--- a/site/docs/latest/functions/overview.md
+++ b/site/docs/latest/functions/overview.md
@@ -422,4 +422,91 @@ Pulsar Functions that use the [Pulsar Functions SDK](#sdk)
can publish metrics t
## State storage
-Pulsar Functions use [Apache BookKeeper](https://bookkeeper.apache.org) as a
state storage interface. All Pulsar installations, including local {% popover
standalone %} installations, include a deployment of BookKeeper {% popover
bookies %}.
\ No newline at end of file
+Pulsar Functions use [Apache BookKeeper](https://bookkeeper.apache.org) as a
state storage interface. All Pulsar installations, including local {% popover
standalone %} installations, include a deployment of BookKeeper {% popover
bookies %}.
+
+## Window functions {#windows}
+
+{% include admonition.html type="warning" content="Window operations for
Pulsar Functions are currently available for Java only. Python support is
coming soon." %}
+
+**Window operations** gather processing results from functions within a
specified time frame rather than on a per-message basis. Here are some example
window operations:
+
+* Counting how many clicks a website has received in the last 10 minutes
+* Determining which product in a product line is the best seller within the
last 1000 purchases
+
+Window operations are of two basic types, depending on whether or not the
windows overlap with one another:
+
+* [Sliding windows](#sliding) are time windows that overlap
+* [Tumbling windows](#tumbling) are exclusive, non-overlapping time windows
+
+### Sliding windows {#sliding}
+
+{% include figure.html src="/img/sliding-window.png" %}
+
+### Tumbling windows {#tumbling}
+
+{% include figure.html src="/img/tumbling-window.png" %}
+
+### Windowing API
+
+{% include admonition.html type="success" title="Windowing supported in both
Pulsar Function APIs" content="Window operations are supported in both the
[language-native](#language-native) API and the [Pulsar Functions SDK](#sdk)."
%}
+
+Window operations take a collection of some data type and "flatten" it into a
single object of that type. Let's say that string objects are being published
on a Pulsar {% popover topic %} called `words`. The example Java function below
would lowercase each word, winnow the word stream down to a collection of
unique words, and then count the total number of unique words encountered:
+
+```java
+import java.util.Collection;
+import java.util.function.Function;
+
+public class CountUniqueWordsFunction implements Function<Collection<String>,
Long> {
+ @Override
+ public Long apply(Collection<String> strings) {
+ return strings
+ .stream()
+ .map(String::toLowerCase)
+ .distinct()
+ .count();
+ }
+}
+```
+
+Here's an example command for running the function in [cluster
mode](../deployment#cluster-mode) with a [count-based tumbling
window](../overview#tumbling) of 100 messages:
+
+```bash
+$ bin/pulsar-admin functions create \
+ --jar my-functions.jar \
+ --className org.example.CountUniqueWordsFunction \
+ --inputs words \
+ --ouput count \
+ --windowLengthCount 100 \
+ # Other function configs
+```
+
+This would run the functino with a [count-based sliding
window](../overview#sliding) of 100 with an interval of 50:
+
+```bash
+$ bin/pulsar-admin functions create \
+ --jar my-functions.jar \
+ --className org.example.CountUniqueWordsFunction \
+ --inputs words \
+ --ouput count \
+ --windowLengthCount 100 \
+ --slidingIntervalCount 50 \
+ # Other function configs
+```
+
+Window operations can be useful for things like reduce operations that
collapse collections into a single value on the basis of an item-by-item
operation. Here's an example:
+
+```java
+import java.util.Collection;
+import java.util.function.Function;
+
+public class SummationFunction implements Function<Collection<Integer>,
Integer> {
+ @Override
+ public Integer apply(Collection<Integer> integers) {
+ return integers.stream().reduce(0, (x, y) -> x + y);
+ }
+}
+```
+
+In `SummationFunction`, each integer in the `integers` collection is added to
the next integer to produce a total sum (or 0 if not values are supplied within
the time window).
+
+{% include admonition.html type="success" title="Windowing vs. state
management" content="It may be useful to think of window functions as normal
Pulsar Functions but with the crucial difference that you don't need to
manually use [state storage](#state-storage) to achieve results across many
operations. Windows are essentially a convenience layer over normal Pulsar
Functions." %}
\ No newline at end of file
diff --git a/site/img/sliding-window.png b/site/img/sliding-window.png
new file mode 100644
index 0000000000..13e9a12065
Binary files /dev/null and b/site/img/sliding-window.png differ
diff --git a/site/img/tumbling-window.png b/site/img/tumbling-window.png
new file mode 100644
index 0000000000..20f8da5064
Binary files /dev/null and b/site/img/tumbling-window.png differ
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services