nwangtw closed pull request #3148: Document new streamlet operations in Heron
website
URL: https://github.com/apache/incubator-heron/pull/3148
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/website/content/docs/concepts/streamlet-api.md
b/website/content/docs/concepts/streamlet-api.md
index 4d5774bfb2..3891ae8a1d 100644
--- a/website/content/docs/concepts/streamlet-api.md
+++ b/website/content/docs/concepts/streamlet-api.md
@@ -77,6 +77,7 @@ In this diagram, the **source streamlet** is produced by a
random generator that
The Heron Streamlet API is currently available for:
* [Java](/docs/developers/java/streamlet-api)
+* [Scala](/docs/developers/scala/streamlet-api)
### The Heron Streamlet API and topologies
@@ -147,7 +148,14 @@ Operation | Description
[union](#filter-operations) | Unifies two streamlets into one, without
[windowing](#windowing) or modifying the elements of the two streamlets
[clone](#clone-operations) | Creates any number of identical copies of a
streamlet
[transform](#transform-operations) | Transform a streamlet using whichever
logic you'd like (useful for transformations that don't neatly map onto the
available operations) | Modify the elements from an incoming streamlet and
update the topology's state
-[reduceByKeyAndWindow](#reduce-by-key-and-window-operations) | Produces a
streamlet out of two separate key-value streamlets on a key, within a [time
window](#windowing), and in accordance with a reduce function that you apply to
all the accumulated values
+[keyBy](#key-by-operations) | Returns a new key-value streamlet by applying
the supplied extractors to each element in the original streamlet
+[reduceByKey](#reduce-by-key-operations) | Produces a streamlet of key-value
on each key and in accordance with a reduce function that you apply to all the
accumulated values
+[reduceByKeyAndWindow](#reduce-by-key-and-window-operations) | Produces a
streamlet of key-value on each key, within a [time window](#windowing), and in
accordance with a reduce function that you apply to all the accumulated values
+[countByKey](#count-by-key-operations) | A special reduce operation of
counting number of tuples on each key
+[countByKeyAndWindow](#count-by-key-and-window-operations) | A special reduce
operation of counting number of tuples on each key, within a [time
window](#windowing)
+[split](#split-operations) | Split a streamlet into multiple streamlets with
different id.
+[withStream](#with-stream-operations) | Select a stream with id from a
streamlet that contains multiple streams
+[applyOperator](#apply-operator-operations) | Returns a new streamlet by
applying an user defined operator to the original streamlet
[join](#join-operations) | Joins two separate key-value streamlets into a
single streamlet on a key, within a [time window](#windowing), and in
accordance with a join function
[log](#log-operations) | Logs the final streamlet output of the processing
graph to stdout
[toSink](#sink-operations) | Sink operations terminate the processing graph by
storing elements in a database, logging elements to stdout, etc.
@@ -299,6 +307,60 @@ builder.newSource(() -> "Some string over and over");
.log();
```
+### Key by operations
+
+Key by operations convert each item in the original streamlet into a key-value
pair and return a new streamlet.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .keyBy(
+ // Key extractor (in this case, each word acts as the key)
+ word -> word,
+ // Value extractor (get the length of each word)
+ word -> workd.length()
+ )
+ // The result is logged
+ .log();
+```
+
+### Reduce by key operations
+
+You can apply
[reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html)
operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each
element of the streamlet
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key operations produce a new streamlet of key-value window objects
(which include a key-value pair including the extracted key and calculated
value).
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .reduceByKeyAndWindow(
+ // Key extractor (in this case, each word acts as the key)
+ word -> word,
+ // Value extractor (each word appears only once, hence the value is
always 1)
+ word -> 1,
+ // Reduce operation (a running sum)
+ (x, y) -> x + y
+ )
+ // The result is logged
+ .log();
+```
+
### Reduce by key and window operations
You can apply
[reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html)
operations to streamlets by specifying:
@@ -335,6 +397,100 @@ builder.newSource(() -> "Mary had a little lamb")
.log();
```
+### Count by key operations
+
+Count by key operations extract keys from data in the original streamlet and
count the number of times a key has been encountered.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .countByKeyAndWindow(word -> word)
+ // The result is logged
+ .log();
+```
+
+### Count by key and window operations
+
+Count by key and window operations extract keys from data in the original
streamlet and count the number of times a key has been encountered within each
[time window](#windowing).
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+import org.apache.heron.streamlet.WindowConfig;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .countByKeyAndWindow(
+ // Key extractor (in this case, each word acts as the key)
+ word -> word,
+ // Window configuration
+ WindowConfig.TumblingCountWindow(50),
+ )
+ // The result is logged
+ .log();
+```
+
+### Split operations
+
+Split operations split a streamlet into multiple streamlets with different id
by getting the corresponding stream ids from each item in the origina streamlet.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Map<String, SerializablePredicate<String>> splitter = new HashMap();
+ splitter.put("long_word", s -> s.length() >= 4);
+ splitter.put("short_word", s -> s.length() < 4);
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ // Splits the stream into streams of long and short words
+ .split(splitter)
+ // Choose the stream of the short words
+ .withStream("short_word")
+ // The result is logged
+ .log();
+```
+
+### With stream operations
+
+With stream operations select a stream with id from a streamlet that contains
multiple streams. They are often used with [split](#split-operations).
+
+### Apply operator operations
+
+Apply operator operations apply a user defined operator (like a bolt) to each
element of the original streamlet and return a new streamlet.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+private class MyBoltOperator extends MyBolt implements
IStreamletRichOperator<Double, Double> {
+}
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ // Apply user defined operation
+ .applyOperator(new MyBoltOperator())
+ // The result is logged
+ .log();
+```
+
### Join operations
Join operations in the Streamlet API take two streamlets (a "left" and a
"right" streamlet) and join them together:
diff --git a/website/content/docs/developers/java/streamlet-api.mmark
b/website/content/docs/developers/java/streamlet-api.mmark
index e8adc6b0d1..a08dfa3dae 100644
--- a/website/content/docs/developers/java/streamlet-api.mmark
+++ b/website/content/docs/developers/java/streamlet-api.mmark
@@ -166,7 +166,14 @@ Operation | Description | Example
[`clone`](#clone-operations) | Creates any number of identical copies of a
streamlet | Create three separate streamlets from the same source
[`transform`](#transform-operations) | Transform a streamlet using whichever
logic you'd like (useful for transformations that don't neatly map onto the
available operations) |
[`join`](#join-operations) | Create a new streamlet by combining two separate
key-value streamlets into one on the basis of each element's key. Supported
Join Types: Inner (as default), Outer-Left, Outer-Right and Outer. | Combine
key-value pairs listing current scores (e.g. `("h4x0r", 127)`) for each user
into a single per-user stream
-[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a
streamlet out of two separate key-value streamlets on a key, within a time
window, and in accordance with a reduce function that you apply to all the
accumulated values | Count the number of times a value has been encountered
within a specified time window
+[`keyBy`](#key-by-operations) | Returns a new key-value streamlet by applying
the supplied extractors to each element in the original streamlet |
+[`reduceByKey`](#reduce-by-key-operations) | Produces a streamlet of
key-value on each key, and in accordance with a reduce function that you apply
to all the accumulated values | Count the number of times a value has been
encountered
+[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a
streamlet of key-value on each key, within a time window, and in accordance
with a reduce function that you apply to all the accumulated values | Count the
number of times a value has been encountered within a specified time window
+[`countByKey`](#count-by-key-operations) | A special reduce operation of
counting number of tuples on each key | Count the number of times a value has
been encountered
+[`countByKeyAndWindow`](#count-by-key-and-window-operations) | A special
reduce operation of counting number of tuples on each key, within a time window
| Count the number of times a value has been encountered within a specified
time window
+[`split`](#split-operations) | Split a streamlet into multiple streamlets with
different id |
+[`withStream`](#with-stream-operations) | Select a stream with id from a
streamlet that contains multiple streams |
+[`applyOperator`](#apply-operator-operations) | Returns a new streamlet by
applying an user defined operator to the original streamlet | Apply an existing
bolt as an operator
[`repartition`](#repartition-operations) | Create a new streamlet by applying
a new parallelism level to the original streamlet | Increase the parallelism of
a streamlet from 5 to 10
[`toSink`](#sink-operations) | Sink operations terminate the processing graph
by storing elements in a database, logging elements to stdout, etc. | Store
processing graph results in an AWS Redshift table
[`log`](#log-operations) | Logs the final results of a processing graph to
stdout. This *must* be the last step in the graph. |
@@ -325,6 +332,56 @@ In this case, the resulting streamlet would consist of an
indefinite stream with
> The effect of a join operation is to create a new streamlet *for each key*.
+### Key by operations
+
+Key by operations convert each item in the original streamlet into a key-value
pair and return a new streamlet. Here is an example:
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .keyBy(
+ // Key extractor (in this case, each word acts as the key)
+ word -> word,
+ // Value extractor (get the length of each word)
+ word -> workd.length()
+ )
+ // The result is logged
+ .log();
+```
+
+### Reduce by key operations
+
+You can apply
[reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html)
operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each
element of the streamlet
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key operations produce a new streamlet of key-value window objects
(which include a key-value pair including the extracted key and calculated
value). Here's an example:
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .reduceByKeyAndWindow(
+ // Key extractor (in this case, each word acts as the key)
+ word -> word,
+ // Value extractor (each word appears only once, hence the value is
always 1)
+ word -> 1,
+ // Reduce operation (a running sum)
+ (x, y) -> x + y
+ )
+ // The result is logged
+ .log();
+```
+
### Reduce by key and window operations
You can apply
[reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html)
operations to streamlets by specifying:
@@ -359,6 +416,92 @@ Builder builder = Builder.newBuilder()
.log();
```
+### Count by key operations
+
+Count by key operations extract keys from data in the original streamlet and
count the number of times a key has been encountered. Here's an example:
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .countByKeyAndWindow(word -> word)
+ // The result is logged
+ .log();
+```
+
+### Count by key and window operations
+
+Count by key and window operations extract keys from data in the original
streamlet and count the number of times a key has been encountered within each
[time window](../../../concepts/topologies#window-operations). Here's an
example:
+
+```java
+import java.util.Arrays;
+
+import org.apache.heron.streamlet.WindowConfig;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .countByKeyAndWindow(
+ // Key extractor (in this case, each word acts as the key)
+ word -> word,
+ // Window configuration
+ WindowConfig.TumblingCountWindow(50),
+ )
+ // The result is logged
+ .log();
+```
+
+### Split operations
+
+Split operations split a streamlet into multiple streamlets with different id
by getting the corresponding stream ids from each item in the origina
streamlet. Here is an example:
+
+```java
+import java.util.Arrays;
+
+Map<String, SerializablePredicate<String>> splitter = new HashMap();
+ splitter.put("long_word", s -> s.length() >= 4);
+ splitter.put("short_word", s -> s.length() < 4);
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ // Splits the stream into streams of long and short words
+ .split(splitter)
+ // Choose the stream of the short words
+ .withStream("short_word")
+ // The result is logged
+ .log();
+```
+
+### With stream operations
+
+With stream operations select a stream with id from a streamlet that contains
multiple streams. They are often used with [split](#split-operations).
+
+### Apply operator operations
+
+Apply operator operations apply a user defined operator (like a bolt) to each
element of the original streamlet and return a new streamlet. Here is an
example:
+
+```java
+import java.util.Arrays;
+
+private class MyBoltOperator extends MyBolt implements
IStreamletRichOperator<Double, Double> {
+}
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ // Apply user defined operation
+ .applyOperator(new MyBoltOperator())
+ // The result is logged
+ .log();
+```
+
### Repartition operations
When you assign a number of [partitions](#partitioning-and-parallelism) to a
processing step, each step that comes after it inherits that number of
partitions. Thus, if you assign 5 partitions to a `map` operation, then any
`mapToKV`, `flatMap`, `filter`, etc. operations that come after it will also be
assigned 5 partitions. But you can also change the number of partitions for a
processing step (as well as the number of partitions for downstream operations)
using `repartition`. Here's an example:
diff --git a/website/content/docs/developers/scala/streamlet-api.mmark
b/website/content/docs/developers/scala/streamlet-api.mmark
index c031bd6ee4..ba3e3c2fa6 100644
--- a/website/content/docs/developers/scala/streamlet-api.mmark
+++ b/website/content/docs/developers/scala/streamlet-api.mmark
@@ -132,7 +132,14 @@ Operation | Description | Example
[`clone`](#clone-operations) | Creates any number of identical copies of a
streamlet | Create three separate streamlets from the same source
[`transform`](#transform-operations) | Transform a streamlet using whichever
logic you'd like (useful for transformations that don't neatly map onto the
available operations) |
[`join`](#join-operations) | Create a new streamlet by combining two separate
key-value streamlets into one on the basis of each element's key. Supported
Join Types: Inner (as default), Outer-Left, Outer-Right and Outer | Combine
key-value pairs listing current scores (e.g. `("h4x0r", 127)`) for each user
into a single per-user stream
-[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a
streamlet out of two separate key-value streamlets on a key, within a time
window, and in accordance with a reduce function that you apply to all the
accumulated values | Count the number of times a value has been encountered
within a specified time window
+[`keyBy`](#key-by-operations) | Returns a new key-value streamlet by applying
the supplied extractors to each element in the original streamlet |
+[`reduceByKey`](#reduce-by-key-operations) | Produces a streamlet of
key-value on each key, and in accordance with a reduce function that you apply
to all the accumulated values | Count the number of times a value has been
encountered
+[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a
streamlet of key-value on each key, within a time window, and in accordance
with a reduce function that you apply to all the accumulated values | Count the
number of times a value has been encountered within a specified time window
+[`countByKey`](#count-by-key-operations) | A special reduce operation of
counting number of tuples on each key | Count the number of times a value has
been encountered
+[`countByKeyAndWindow`](#count-by-key-and-window-operations) | A special
reduce operation of counting number of tuples on each key, within a time window
| Count the number of times a value has been encountered within a specified
time window
+[`split`](#split-operations) | Split a streamlet into multiple streamlets with
different id |
+[`withStream`](#with-stream-operations) | Select a stream with id from a
streamlet that contains multiple streams |
+[`applyOperator`](#apply-operator-operations) | Returns a new streamlet by
applying an user defined operator to the original streamlet | Apply an existing
bolt as an operator
[`repartition`](#repartition-operations) | Create a new streamlet by applying
a new parallelism level to the original streamlet | Increase the parallelism of
a streamlet from 5 to 10
[`toSink`](#sink-operations) | Sink operations terminate the processing graph
by storing elements in a database, logging elements to stdout, etc. | Store
processing graph results in an AWS Redshift table
[`log`](#log-operations) | Logs the final results of a processing graph to
stdout. This *must* be the last step in the graph. |
@@ -301,6 +308,55 @@ In this case, the resulting streamlet would consist of an
indefinite stream with
> The effect of a `join` operation is to create a new streamlet *for each key*.
+### Key by operations
+
+Key by operations convert each item in the original streamlet into a key-value
pair and return a new streamlet. Here is an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+ .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+ // Convert each sentence into individual words
+ .flatMap[String](_.split(" "))
+ .keyBy[String, Int](
+ // Key extractor (in this case, each word acts as the key)
+ (word: String) => word,
+ // Value extractor (get the length of each word)
+ (word: String) => word.length
+ )
+ // The result is logged
+ .log();
+```
+
+### Reduce by key operations
+
+You can apply
[reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html)
operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each
element of the streamlet
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key operations produce a new streamlet of key-value window objects
(which include a key-value pair including the extracted key and calculated
value). Here's an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+ .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+ // Convert each sentence into individual words
+ .flatMap[String](_.split(" "))
+ .reduceByKey[String, Int](
+ // Key extractor (in this case, each word acts as the key)
+ (word: String) => word,
+ // Value extractor (each word appears only once, hence the value is
always 1)
+ (word: String) => 1,
+ // Reduce operation (a running sum)
+ (x: Int, y: Int) => x + y)
+ // The result is logged
+ .log();
+```
+
### Reduce by key and window operations
You can apply
[reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html)
operations to streamlets by specifying:
@@ -334,6 +390,88 @@ builder
.log();
```
+### Count by key operations
+
+Count by key operations extract keys from data in the original streamlet and
count the number of times a key has been encountered. Here's an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+ .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+ // Convert each sentence into individual words
+ .flatMap[String](_.split(" "))
+ // Count the number of occurrences of each word
+ .countByKey[String]((word: String) => word)
+ // The result is logged
+ .log();
+```
+
+### Count by key and window operations
+
+Count by key and window operations extract keys from data in the original
streamlet and count the number of times a key has been encountered within each
[time window](../../../concepts/topologies#window-operations). Here's an
example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+ .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+ // Convert each sentence into individual words
+ .flatMap[String](_.split(" "))
+ // Count the number of occurrences of each word within each time window
+ .countByKeyAndWindow[String](
+ (word: String) => word,
+ WindowConfig.TumblingCountWindow(50))
+ // The result is logged
+ .log();
+```
+
+### Split operations
+
+Split operations split a streamlet into multiple streamlets with different id
by getting the corresponding stream ids from each item in the origina
streamlet. Here is an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+ .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+ // Convert each sentence into individual words
+ .flatMap[String](_.split(" "))
+ // Count the number of occurrences of each word within each time window
+ .split(Map(
+ "long_word" -> { word: String => word.length >= 4 },
+ "short_word" -> { word: String => word.length < 4 }
+ ))
+ .withStream("short_word)
+ // The result is logged
+ .log();
+```
+
+### With stream operations
+
+With stream operations select a stream with id from a streamlet that contains
multiple streams. They are often used with [split](#split-operations).
+
+### Apply operator operations
+
+Apply operator operations apply a user defined operator (like a bolt) to each
element of the original streamlet and return a new streamlet. Here is an
example:
+
+```scala
+val builder = Builder.newBuilder()
+
+private class MyBoltOperator extends MyBolt
+ with IStreamletOperator[String, String] {
+}
+
+builder
+ .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+ // Convert each sentence into individual words
+ .flatMap[String](_.split(" "))
+ // Apply user defined operation
+ .applyOperator(new MyBoltOperator())
+ // The result is logged
+ .log();
+```
+
### Repartition operations
When you assign a number of [partitions](#partitioning-and-parallelism) to a
processing step, each step that comes after it inherits that number of
partitions. Thus, if you assign 5 partitions to a `map` operation, then any
`mapToKV`, `flatMap`, `filter`, etc. operations that come after it will also be
assigned 5 partitions. But you can also change the number of partitions for a
processing step (as well as the number of partitions for downstream operations)
using `repartition`. Here's an example:
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services