[docs] [streaming] Documentation updates for stream windowing

This closes #395


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e30a2ea2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e30a2ea2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e30a2ea2

Branch: refs/heads/master
Commit: e30a2ea2fd5eb9f65e89e44e2f04458d1502e4d2
Parents: 3d10a23
Author: mbalassi <mbala...@apache.org>
Authored: Mon Feb 16 15:22:56 2015 +0100
Committer: mbalassi <mbala...@apache.org>
Committed: Mon Feb 16 15:22:56 2015 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md | 78 +++++++++++++++++++-------------------------
 1 file changed, 33 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e30a2ea2/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index e409b49..3d6b75e 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -34,7 +34,7 @@ Flink Streaming is an extension of the core Flink API for 
high-throughput, low-l
 Flink Streaming API
 -----------
 
-The Streaming API is currently part of the *addons* Maven project. All 
relevant classes are located in the *org.apache.flink.streaming* package.
+The Streaming API is currently part of the *flink-staging* Maven project. All 
relevant classes are located in the *org.apache.flink.streaming* package.
 
 Add the following dependency to your `pom.xml` to use the Flink Streaming.
 
@@ -42,7 +42,7 @@ Add the following dependency to your `pom.xml` to use the 
Flink Streaming.
 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-core</artifactId>
-    <version>{{site.FLINK_VERSION_STABLE}}</version>
+    <version>{{site.FLINK_VERSION_SHORT}}</version>
 </dependency>
 ~~~
 
@@ -284,7 +284,7 @@ The Flink Streaming API supports different types of 
pre-defined aggregation oper
 
 Types of aggregations: `sum(field)`, `min(field)`, `max(field)`, `minBy(field, 
first)`, `maxBy(field, first)`
 
-With `sum`, `min`, and `max` for every incoming tuple the selected field is 
replaced with the current aggregated value. Fields can be selected using either 
field positions or field expressions (similarly to grouping).  
+With `sum`, `min`, and `max` for every incoming tuple the selected field is 
replaced with the current aggregated value. Fields can be selected using either 
field positions or field expressions (similarly to grouping).
 
 With `minBy` and `maxBy` the output of the operator is the element with the 
current minimal or maximal value at the given field. If more components share 
the minimum or maximum value, the user can decide if the operator should return 
the first or last element. This can be set by the `first` boolean parameter.
 
@@ -300,14 +300,15 @@ The user can control the size (eviction) of the windows 
and the frequency of red
  * `dataStream.window(…).every(…).mapWindow(…).flatten()`
  * 
`dataStream.window(…).every(…).groupBy(…).aggregate(…).getDiscretizedStream()`
 
-The core abstraction of the Windowing semantics is the `WindowedDataStream` 
and the `StreamWindow`. The WindowedDataStream which is created when we call 
the `.window(…)` method of the DataStream, represents the windowed 
discretisation of the underlying stream. The user can think about it simply as 
a `DataStream<StreamWindow<T>>` where additional API functions are supplied to 
provide efficient transformations of individual windows. 
+The core abstraction of the Windowing semantics is the `WindowedDataStream` 
and the `StreamWindow`. The `WindowedDataStream` is created when we call the 
`.window(…)` method of the DataStream and represents the windowed 
discretisation of the underlying stream. The user can think about it simply as 
a `DataStream<StreamWindow<T>>` where additional API functions are supplied to 
provide efficient transformations of individual windows. 
 
-The result of a window transformation is again a `WindowedDataStream` which 
can also be used to further transform the resulting windows. In this sense, 
window transformations define mapping from `StreamWindow -> StreamWindow’`.
+The result of a window transformation is again a `WindowedDataStream` which 
can also be used to further transform the resulting windows. In this sense, 
window transformations define mapping from stream windows to stream windows.
 
-The user have different ways of working further with a result of a window 
operation:
- * `windowedDataStream.flatten()` - Which streams out the results element wise 
and returns a `DataStream<T>` where T is the type of the underlying windowed 
stream
- * `windowedDataStream.getDiscretizedStream()` - Which returns a 
`DataStream<StreamWindow<T>>` for applying some advanced logic on the stream 
windows itself
- * Calling any window transformation further transforms the windows
+The user has different ways of using the a result of a window operation:
+
+ * `windowedDataStream.flatten()` - streams the results element wise and 
returns a `DataStream<T>` where T is the type of the underlying windowed stream
+ * `windowedDataStream.getDiscretizedStream()` - returns a 
`DataStream<StreamWindow<T>>` for applying some advanced logic on the stream 
windows itself
+ * Calling any window transformation further transforms the windows, while 
preserving the windowing logic
 
 The next example would create windows that hold elements of the last 5 
seconds, and the user defined transformation would be executed on the windows 
every second (sliding the window by 1 second):
 
@@ -315,7 +316,7 @@ The next example would create windows that hold elements of 
the last 5 seconds,
 dataStream.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, 
TimeUnit.SECONDS))
 ~~~
 
-This approach is often referred to as policy based windowing. Different 
policies (count, time, etc.) can be mixed as well; for example to downsample 
our stream, a window that takes the latest 100 elements of the stream every 
minute is created as follows:
+This approach is often referred to as policy based windowing. Different 
policies (count, time, etc.) can be mixed as well, for example to downsample 
our stream, a window that takes the latest 100 elements of the stream every 
minute is created as follows:
 
 ~~~java
 dataStream.window(Count.of(100)).every(Time.of(1, TimeUnit.MINUTES))
@@ -329,7 +330,7 @@ Several predefined policies are provided in the API, 
including delta-based, coun
  * `Count.of(…)`
  * `Delta.of(…)`
 
-For detailed description of these policies please refer to the javadocs.
+For detailed description of these policies please refer to the 
[Javadocs](http://flink.apache.org/docs/latest/api/java/).
 
 #### Policy based windowing
 The policy based windowing is a highly flexible way to specify stream 
discretisation also called windowing semantics. Two types of policies are used 
for such a specification:
@@ -351,16 +352,16 @@ By default most triggers can only trigger when a new 
element arrives. This might
 Time-based trigger and eviction policies can work with user defined 
`TimeStamp` implementations, these policies already cover most use cases.
  
 #### Reduce on windowed data streams
-The `WindowedDataStream<T>.reduceWindow(ReduceFunction<T>)` transformation 
calls the user-defined `ReduceFunction` at every trigger on the records 
currently in the window. The user can also use the different pre-implemented 
streaming aggregations: `sum, min, max, minBy, maxBy`
+The `WindowedDataStream<T>.reduceWindow(ReduceFunction<T>)` transformation 
calls the user-defined `ReduceFunction` at every trigger on the records 
currently in the window. The user can also use the different pre-implemented 
streaming aggregations such as `sum, min, max, minBy` and `maxBy`.
 
-A window reduce that sums the elements in the last minute with 10 seconds 
slide interval:
+The following is an example for a window reduce that sums the elements in the 
last minute with 10 seconds slide interval:
 
 ~~~java
 dataStream.window(Time.of(1, 
TimeUnit.MINUTES)).every(Time.of(10,TimeUnit.SECONDS)).sum(field);
 ~~~
 
 #### Map on windowed data streams
-The `WindowedDataStream<T>.mapWindow(WindowMapFunction<T,O>)` transformation 
calls  `WindowMapFunction.mapWindow(…)` for each `StreamWindow` in the 
discretised stream providing access to all elements in the window through the 
iterable interface. At each function call the output `StreamWindow<O>` will 
consist of all the elements collected to the collector. This allows a 
straightforward way of mapping one stream window to another.
+The `WindowedDataStream<T>.mapWindow(WindowMapFunction<T,O>)` transformation 
calls  `mapWindow(…)` for each `StreamWindow` in the discretised stream 
providing access to all elements in the window through the iterable interface. 
At each function call the output `StreamWindow<O>` will consist of all the 
elements collected to the collector. This allows a straightforward way of 
mapping one stream window to another.
 
 ~~~java
 windowedDataStream.mapWindow(windowMapFunction)
@@ -379,9 +380,7 @@ To get the maximal value for each key on the last 100 
elements (global) we use t
 dataStream.window(Count.of(100)).every(…).groupBy(groupingField).max(field);
 ~~~
 
-Using this approach we took the last 100 elements, divided it into groups by 
key then applied the aggregation.
-
-To create fixed size windows for every key we need to reverse the order of the 
groupBy call. So to take the max for the last 100 elements in Each group:
+Using this approach we took the last 100 elements, divided it into groups by 
key then applied the aggregation. To create fixed size windows for every key we 
need to reverse the order of the groupBy call. So to take the max for the last 
100 elements in Each group:
 
 ~~~java
 dataStream.groupBy(groupingField).window(Count.of(100)).every(…).max(field);
@@ -393,31 +392,20 @@ This will create separate windows for different keys and 
apply the trigger and e
 Using the `WindowedDataStream` abstraction we can apply several 
transformations one after another on the discretised streams without having to 
re-discretise it:
 
 ~~~java
-dataStream.window(Count.of(1000)).groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…).flatten()
+dataStream.window(Count.of(1000)).groupBy(firstKey).mapWindow(…)
+    .groupBy(secondKey).reduceWindow(…).flatten()
 ~~~
 
 The above call would create global windows of 1000 elements group it by the 
first key and then apply a mapWindow transformation. The resulting windowed 
stream will then be grouped by the second key and further reduced. The results 
of the reduce transformation are then flattened.
 
-Notice here that we only defined the window size once at the beginning of the 
transformation. This means that anything that happens afterwards 
(`.groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…)`) 
happens inside the 1000 element windows. Of course the mapWindow might reduce 
the number of elements but the idea is that each transformation still 
corresponds to the same 1000 elements in the original stream.
+Notice here that we only defined the window size once at the beginning of the 
transformation. This means that anything that happens afterwards 
(`.groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…)`) 
happens inside the 1000 element windows. Of course the mapWindow might reduce 
the number of elements but the key idea is that each transformation still 
corresponds to the same 1000 elements in the original stream.
 
 #### Global vs local discretisation
-By default all window discretisation calls (`dataStream.window(…)`) define 
global windows meaning that a global window of count 100 will contain the last 
100 elements arrived at the discretisation operator in order. In most cases 
(except for Time) this means that the operator doing the actual discretisation 
needs to have a degree of parallelism of 1 to be able to correctly execute the 
discretisation logic. 
-
-Sometimes it is enough to create local discretisations, which allows the 
discretiser to run in parallel and apply the given discretisation logic at 
every discretiser instance. To allow local discretisation use the `.local()` 
method of the windowed data stream.
-
-Example:
-
-~~~java
-dataStream.window(Count.of(100)).maxBy(field);
-~~~
-
-This would create global windows of 100 elements (Count discretises with 
parallelism of 1) and return the record with the max value by the selected 
field, while
+By default all window discretisation calls (`dataStream.window(…)`) define 
global windows meaning that a global window of count 100 will contain the last 
100 elements arrived at the discretisation operator in order. In most cases 
(except for Time) this means that the operator doing the actual discretisation 
needs to have a degree of parallelism of 1 to be able to correctly execute the 
discretisation logic.
 
-~~~java
-dataStream.window(Count.of(100)).local().maxBy(field);
-~~~
+Sometimes it is sufficient to create local discretisations, which allows the 
discretiser to run in parallel and apply the given discretisation logic at 
every discretiser instance. To allow local discretisation use the `local()` 
method of the windowed data stream.
 
-this would create several count discretisers (as defined by the environment 
parallelism) and compute the max values accordingly.
+For example `dataStream.window(Count.of(100)).maxBy(field)` would create 
global windows of 100 elements (Count discretises with parallelism of 1) and 
return the record with the max value by the selected field, alternatively the 
`dataStream.window(Count.of(100)).local().maxBy(field)` would create several 
count discretisers (as defined by the environment parallelism) and compute the 
max values accordingly.
 
 
 ### Temporal database style operators
@@ -432,9 +420,9 @@ The following code shows a default Join transformation 
using field position keys
 
 ~~~java
 dataStream1.join(dataStream2)
-               .onWindow(windowing_params)
-               .where(key_in_first)
-               .equalTo(key_in_second);
+    .onWindow(windowing_params)
+    .where(key_in_first)
+    .equalTo(key_in_second);
 ~~~
 
 The Cross transformation combines two DataStreams into one DataStreams. It 
builds all pairwise combinations of the elements of both input DataStreams in 
the current window, i.e., it builds a temporal Cartesian product.
@@ -527,7 +515,7 @@ For example to split even and odd numbers:
 
 ~~~java
 @Override
-Iterable<String> select(Integer value) {       
+Iterable<String> select(Integer value) {
 
     List<String> outputs = new ArrayList<String>();
 
@@ -622,7 +610,7 @@ For a detailed Java 8 Guide please refer to the [Java 8 
Programming Guide](java8
 
 ~~~java
 SplitDataStream<Integer> split = someDataStream
-                                       .split(x -> 
Arrays.asList(String.valueOf(x % 2)));
+    .split(x -> Arrays.asList(String.valueOf(x % 2)));
 ~~~
 
 Operator Settings
@@ -646,7 +634,7 @@ env.setBufferTimeout(timeoutMillis);
 env.genereateSequence(1,10).map(new 
MyMapper()).setBufferTimeout(timeoutMillis);
 ~~~
 
-To maximise the throughput the user can call `.setBufferTimeout(-1)` which 
will remove the timeout and buffers will only be flushed when they are full.
+To maximise the throughput the user can call `setBufferTimeout(-1)` which will 
remove the timeout and buffers will only be flushed when they are full.
 To minimise latency, set the timeout to a value close to 0 (fro example 5 or 
10 ms). Theoretically a buffer timeout of 0 will cause all outputs to be 
flushed when produced, but this setting should be avoided because it can cause 
severe performance degradation.
 
 
@@ -673,7 +661,7 @@ This connector provides access to data streams from [Apache 
Kafka](https://kafka
 #### Kafka Source
 A class providing an interface for receiving data from Kafka.
 
-The followings have to be provided for the `KafkaSource(..)` constructor in 
order:
+The followings have to be provided for the `KafkaSource(…)` constructor in 
order:
 
 1. The hostname
 2. The group name
@@ -895,7 +883,7 @@ After installing Docker an image can be pulled for each 
connector. Containers ca
 For the easiest set up create a jar with all the dependencies of the 
*flink-streaming-connectors* project.
 
 ~~~batch
-cd /PATH/TO/GIT/incubator-flink/flink-addons/flink-streaming-connectors
+cd /PATH/TO/GIT/flink/flink-staging/flink-streaming-connectors
 mvn assembly:assembly
 ~~~batch
 
@@ -1012,14 +1000,14 @@ Now a terminal started running from the image with all 
the necessary configurati
 
 To have the latest version of Flink type:
 ~~~batch
-cd /git/incubator-flink/
+cd /git/flink/
 git pull
 ~~~
 
 Then build the code with:
 
 ~~~batch
-cd 
/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/
+cd /git/flink/flink-staging/flink-streaming/flink-streaming-connectors/
 mvn install -DskipTests
 ~~~
 
@@ -1047,4 +1035,4 @@ In the example there are to connectors. One that sends 
messages to Flume and one
 <DATE> INFO flume.FlumeTopology: String: <five> arrived from Flume
 ~~~
 
-[Back to top](#top)
\ No newline at end of file
+[Back to top](#top)

Reply via email to