sijie closed pull request #1791: [WIP] Pulsar Functions windowing documentation
URL: https://github.com/apache/incubator-pulsar/pull/1791
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index dc311bafaa..accc15c863 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -239,7 +239,7 @@ void processArguments() throws Exception {
         protected Integer slidingIntervalCount;
         @Parameter(names = "--slidingIntervalDurationMs", description = "")
         protected Long slidingIntervalDurationMs;
-        @Parameter(names = "--autoAck", description = "")
+        @Parameter(names = "--autoAck", description = "Whether the function 
will automatically acknowledge (ack) each message upon receipt")
         protected Boolean autoAck;
         protected FunctionConfig functionConfig;
         protected String userCodeFile;
diff --git 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
index 189b2ca103..94c156381b 100644
--- 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
+++ 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
@@ -25,16 +25,9 @@
 import java.util.function.Function;
 
 @Slf4j
-public class WindowFunction implements Function <Collection<Integer>, Integer> 
{
+public class WindowFunction implements Function<Collection<Integer>, Integer> {
     @Override
     public Integer apply(Collection<Integer> integers) {
-
-        int sum = integers.stream().reduce(new BinaryOperator<Integer>() {
-            @Override
-            public Integer apply(Integer integer, Integer integer2) {
-                return integer + integer2;
-            }
-        }).get();
-        return sum;
+        return integers.stream().reduce(0, (x, y) -> x + y);
     }
 }
diff --git a/site/_data/cli/pulsar-admin.yaml b/site/_data/cli/pulsar-admin.yaml
index 21806199f9..c09ca6170f 100644
--- a/site/_data/cli/pulsar-admin.yaml
+++ b/site/_data/cli/pulsar-admin.yaml
@@ -160,6 +160,17 @@ commands:
       description: The function's tenant
     - flags: --userConfig
       description: A user-supplied config value, set as a key/value pair. You 
can set multiple user config values.
+    - flags: --autoAck
+      description: Whether the function will automatically acknowledge (ack) 
each message upon receipt
+      default: true
+    - flags: --windowLengthCount
+      desription: TODO
+    - flags: --windowLengthDurationMs
+      description: TODO
+    - flags: --slidingIntervalCount
+      description: TODO
+    - flags: --slidingIntervalDurationMs
+      description: TODO
   - name: create
     description: Creates a new Pulsar Function on the target infrastructure
     options:
diff --git a/site/docs/latest/functions/deployment.md 
b/site/docs/latest/functions/deployment.md
index 3dbe6dd5ec..f0986615e1 100644
--- a/site/docs/latest/functions/deployment.md
+++ b/site/docs/latest/functions/deployment.md
@@ -208,3 +208,27 @@ Pulsar supports three different [subscription 
types](../../getting-started/Conce
 
 Pulsar Functions can also be assigned a subscription type when you 
[create](#cluster-mode) them or run them [locally](#local-run). In cluster 
mode, the subscription can also be [updated](#updating) after the function has 
been created.
 -->
+
+## Window functions
+
+When running Pulsar Functions in either [cluster](#cluster-mode) or [local 
run](#local-run) mode, you can create [**window 
functions**](../overview#windows) that process collections of messages 
accumulated over a specified time window.
+
+This CLI command, for example, would run a window
+
+```bash
+$ bin/pulsar-admin functions create \
+  --jar my-functions.jar \
+  --className org.example.WordCountFunction \
+  --windowLengthCount 100 \
+
+```
+
+
+
+Flag | Meaning | Default
+:----|:--------|:-------
+`--windowLengthCount` | |
+`--windowLengthDurationMs` | |
+`--slidingIntervalCount` | |
+`--slidingIntervalDurationMs` | |
+
diff --git a/site/docs/latest/functions/overview.md 
b/site/docs/latest/functions/overview.md
index a904f817a8..0172b44a0a 100644
--- a/site/docs/latest/functions/overview.md
+++ b/site/docs/latest/functions/overview.md
@@ -422,4 +422,91 @@ Pulsar Functions that use the [Pulsar Functions SDK](#sdk) 
can publish metrics t
 
 ## State storage
 
-Pulsar Functions use [Apache BookKeeper](https://bookkeeper.apache.org) as a 
state storage interface. All Pulsar installations, including local {% popover 
standalone %} installations, include a deployment of BookKeeper {% popover 
bookies %}.
\ No newline at end of file
+Pulsar Functions use [Apache BookKeeper](https://bookkeeper.apache.org) as a 
state storage interface. All Pulsar installations, including local {% popover 
standalone %} installations, include a deployment of BookKeeper {% popover 
bookies %}.
+
+## Window functions {#windows}
+
+{% include admonition.html type="warning" content="Window operations for 
Pulsar Functions are currently available for Java only. Python support is 
coming soon." %}
+
+**Window operations** gather processing results from functions within a 
specified time frame rather than on a per-message basis. Here are some example 
window operations:
+
+* Counting how many clicks a website has received in the last 10 minutes
+* Determining which product in a product line is the best seller within the 
last 1000 purchases
+
+Window operations are of two basic types, depending on whether or not the 
windows overlap with one another:
+
+* [Sliding windows](#sliding) are time windows that overlap
+* [Tumbling windows](#tumbling) are exclusive, non-overlapping time windows
+
+### Sliding windows {#sliding}
+
+{% include figure.html src="/img/sliding-window.png" %}
+
+### Tumbling windows {#tumbling}
+
+{% include figure.html src="/img/tumbling-window.png" %}
+
+### Windowing API
+
+{% include admonition.html type="success" title="Windowing supported in both 
Pulsar Function APIs" content="Window operations are supported in both the 
[language-native](#language-native) API and the [Pulsar Functions SDK](#sdk)." 
%}
+
+Window operations take a collection of some data type and "flatten" it into a 
single object of that type. Let's say that string objects are being published 
on a Pulsar {% popover topic %} called `words`. The example Java function below 
would lowercase each word, winnow the word stream down to a collection of 
unique words, and then count the total number of unique words encountered:
+
+```java
+import java.util.Collection;
+import java.util.function.Function;
+
+public class CountUniqueWordsFunction implements Function<Collection<String>, 
Long> {
+    @Override
+    public Long apply(Collection<String> strings) {
+        return strings
+                .stream()
+                .map(String::toLowerCase)
+                .distinct()
+                .count();
+    }
+}
+```
+
+Here's an example command for running the function in [cluster 
mode](../deployment#cluster-mode) with a [count-based tumbling 
window](../overview#tumbling) of 100 messages:
+
+```bash
+$ bin/pulsar-admin functions create \
+  --jar my-functions.jar \
+  --className org.example.CountUniqueWordsFunction \
+  --inputs words \
+  --ouput count \
+  --windowLengthCount 100 \
+  # Other function configs
+```
+
+This would run the functino with a [count-based sliding 
window](../overview#sliding) of 100 with an interval of 50:
+
+```bash
+$ bin/pulsar-admin functions create \
+  --jar my-functions.jar \
+  --className org.example.CountUniqueWordsFunction \
+  --inputs words \
+  --ouput count \
+  --windowLengthCount 100 \
+  --slidingIntervalCount 50 \
+  # Other function configs
+```
+
+Window operations can be useful for things like reduce operations that 
collapse collections into a single value on the basis of an item-by-item 
operation. Here's an example:
+
+```java
+import java.util.Collection;
+import java.util.function.Function;
+
+public class SummationFunction implements Function<Collection<Integer>, 
Integer> {
+    @Override
+    public Integer apply(Collection<Integer> integers) {
+        return integers.stream().reduce(0, (x, y) -> x + y);
+    }
+}
+```
+
+In `SummationFunction`, each integer in the `integers` collection is added to 
the next integer to produce a total sum (or 0 if not values are supplied within 
the time window).
+
+{% include admonition.html type="success" title="Windowing vs. state 
management" content="It may be useful to think of window functions as normal 
Pulsar Functions but with the crucial difference that you don't need to 
manually use [state storage](#state-storage) to achieve results across many 
operations. Windows are essentially a convenience layer over normal Pulsar 
Functions." %}
\ No newline at end of file
diff --git a/site/img/sliding-window.png b/site/img/sliding-window.png
new file mode 100644
index 0000000000..13e9a12065
Binary files /dev/null and b/site/img/sliding-window.png differ
diff --git a/site/img/tumbling-window.png b/site/img/tumbling-window.png
new file mode 100644
index 0000000000..20f8da5064
Binary files /dev/null and b/site/img/tumbling-window.png differ


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to