[docs] [streaming] Documentation updates for stream windowing This closes #395
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e30a2ea2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e30a2ea2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e30a2ea2 Branch: refs/heads/master Commit: e30a2ea2fd5eb9f65e89e44e2f04458d1502e4d2 Parents: 3d10a23 Author: mbalassi <mbala...@apache.org> Authored: Mon Feb 16 15:22:56 2015 +0100 Committer: mbalassi <mbala...@apache.org> Committed: Mon Feb 16 15:22:56 2015 +0100 ---------------------------------------------------------------------- docs/streaming_guide.md | 78 +++++++++++++++++++------------------------- 1 file changed, 33 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e30a2ea2/docs/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md index e409b49..3d6b75e 100644 --- a/docs/streaming_guide.md +++ b/docs/streaming_guide.md @@ -34,7 +34,7 @@ Flink Streaming is an extension of the core Flink API for high-throughput, low-l Flink Streaming API ----------- -The Streaming API is currently part of the *addons* Maven project. All relevant classes are located in the *org.apache.flink.streaming* package. +The Streaming API is currently part of the *flink-staging* Maven project. All relevant classes are located in the *org.apache.flink.streaming* package. Add the following dependency to your `pom.xml` to use the Flink Streaming. @@ -42,7 +42,7 @@ Add the following dependency to your `pom.xml` to use the Flink Streaming. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-core</artifactId> - <version>{{site.FLINK_VERSION_STABLE}}</version> + <version>{{site.FLINK_VERSION_SHORT}}</version> </dependency> ~~~ @@ -284,7 +284,7 @@ The Flink Streaming API supports different types of pre-defined aggregation oper Types of aggregations: `sum(field)`, `min(field)`, `max(field)`, `minBy(field, first)`, `maxBy(field, first)` -With `sum`, `min`, and `max` for every incoming tuple the selected field is replaced with the current aggregated value. Fields can be selected using either field positions or field expressions (similarly to grouping). +With `sum`, `min`, and `max` for every incoming tuple the selected field is replaced with the current aggregated value. Fields can be selected using either field positions or field expressions (similarly to grouping). With `minBy` and `maxBy` the output of the operator is the element with the current minimal or maximal value at the given field. If more components share the minimum or maximum value, the user can decide if the operator should return the first or last element. This can be set by the `first` boolean parameter. @@ -300,14 +300,15 @@ The user can control the size (eviction) of the windows and the frequency of red * `dataStream.window(â¦).every(â¦).mapWindow(â¦).flatten()` * `dataStream.window(â¦).every(â¦).groupBy(â¦).aggregate(â¦).getDiscretizedStream()` -The core abstraction of the Windowing semantics is the `WindowedDataStream` and the `StreamWindow`. The WindowedDataStream which is created when we call the `.window(â¦)` method of the DataStream, represents the windowed discretisation of the underlying stream. The user can think about it simply as a `DataStream<StreamWindow<T>>` where additional API functions are supplied to provide efficient transformations of individual windows. +The core abstraction of the Windowing semantics is the `WindowedDataStream` and the `StreamWindow`. The `WindowedDataStream` is created when we call the `.window(â¦)` method of the DataStream and represents the windowed discretisation of the underlying stream. The user can think about it simply as a `DataStream<StreamWindow<T>>` where additional API functions are supplied to provide efficient transformations of individual windows. -The result of a window transformation is again a `WindowedDataStream` which can also be used to further transform the resulting windows. In this sense, window transformations define mapping from `StreamWindow -> StreamWindowâ`. +The result of a window transformation is again a `WindowedDataStream` which can also be used to further transform the resulting windows. In this sense, window transformations define mapping from stream windows to stream windows. -The user have different ways of working further with a result of a window operation: - * `windowedDataStream.flatten()` - Which streams out the results element wise and returns a `DataStream<T>` where T is the type of the underlying windowed stream - * `windowedDataStream.getDiscretizedStream()` - Which returns a `DataStream<StreamWindow<T>>` for applying some advanced logic on the stream windows itself - * Calling any window transformation further transforms the windows +The user has different ways of using the a result of a window operation: + + * `windowedDataStream.flatten()` - streams the results element wise and returns a `DataStream<T>` where T is the type of the underlying windowed stream + * `windowedDataStream.getDiscretizedStream()` - returns a `DataStream<StreamWindow<T>>` for applying some advanced logic on the stream windows itself + * Calling any window transformation further transforms the windows, while preserving the windowing logic The next example would create windows that hold elements of the last 5 seconds, and the user defined transformation would be executed on the windows every second (sliding the window by 1 second): @@ -315,7 +316,7 @@ The next example would create windows that hold elements of the last 5 seconds, dataStream.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS)) ~~~ -This approach is often referred to as policy based windowing. Different policies (count, time, etc.) can be mixed as well; for example to downsample our stream, a window that takes the latest 100 elements of the stream every minute is created as follows: +This approach is often referred to as policy based windowing. Different policies (count, time, etc.) can be mixed as well, for example to downsample our stream, a window that takes the latest 100 elements of the stream every minute is created as follows: ~~~java dataStream.window(Count.of(100)).every(Time.of(1, TimeUnit.MINUTES)) @@ -329,7 +330,7 @@ Several predefined policies are provided in the API, including delta-based, coun * `Count.of(â¦)` * `Delta.of(â¦)` -For detailed description of these policies please refer to the javadocs. +For detailed description of these policies please refer to the [Javadocs](http://flink.apache.org/docs/latest/api/java/). #### Policy based windowing The policy based windowing is a highly flexible way to specify stream discretisation also called windowing semantics. Two types of policies are used for such a specification: @@ -351,16 +352,16 @@ By default most triggers can only trigger when a new element arrives. This might Time-based trigger and eviction policies can work with user defined `TimeStamp` implementations, these policies already cover most use cases. #### Reduce on windowed data streams -The `WindowedDataStream<T>.reduceWindow(ReduceFunction<T>)` transformation calls the user-defined `ReduceFunction` at every trigger on the records currently in the window. The user can also use the different pre-implemented streaming aggregations: `sum, min, max, minBy, maxBy` +The `WindowedDataStream<T>.reduceWindow(ReduceFunction<T>)` transformation calls the user-defined `ReduceFunction` at every trigger on the records currently in the window. The user can also use the different pre-implemented streaming aggregations such as `sum, min, max, minBy` and `maxBy`. -A window reduce that sums the elements in the last minute with 10 seconds slide interval: +The following is an example for a window reduce that sums the elements in the last minute with 10 seconds slide interval: ~~~java dataStream.window(Time.of(1, TimeUnit.MINUTES)).every(Time.of(10,TimeUnit.SECONDS)).sum(field); ~~~ #### Map on windowed data streams -The `WindowedDataStream<T>.mapWindow(WindowMapFunction<T,O>)` transformation calls `WindowMapFunction.mapWindow(â¦)` for each `StreamWindow` in the discretised stream providing access to all elements in the window through the iterable interface. At each function call the output `StreamWindow<O>` will consist of all the elements collected to the collector. This allows a straightforward way of mapping one stream window to another. +The `WindowedDataStream<T>.mapWindow(WindowMapFunction<T,O>)` transformation calls `mapWindow(â¦)` for each `StreamWindow` in the discretised stream providing access to all elements in the window through the iterable interface. At each function call the output `StreamWindow<O>` will consist of all the elements collected to the collector. This allows a straightforward way of mapping one stream window to another. ~~~java windowedDataStream.mapWindow(windowMapFunction) @@ -379,9 +380,7 @@ To get the maximal value for each key on the last 100 elements (global) we use t dataStream.window(Count.of(100)).every(â¦).groupBy(groupingField).max(field); ~~~ -Using this approach we took the last 100 elements, divided it into groups by key then applied the aggregation. - -To create fixed size windows for every key we need to reverse the order of the groupBy call. So to take the max for the last 100 elements in Each group: +Using this approach we took the last 100 elements, divided it into groups by key then applied the aggregation. To create fixed size windows for every key we need to reverse the order of the groupBy call. So to take the max for the last 100 elements in Each group: ~~~java dataStream.groupBy(groupingField).window(Count.of(100)).every(â¦).max(field); @@ -393,31 +392,20 @@ This will create separate windows for different keys and apply the trigger and e Using the `WindowedDataStream` abstraction we can apply several transformations one after another on the discretised streams without having to re-discretise it: ~~~java -dataStream.window(Count.of(1000)).groupBy(firstKey).mapWindow(â¦).groupBy(secondKey).reduceWindow(â¦).flatten() +dataStream.window(Count.of(1000)).groupBy(firstKey).mapWindow(â¦) + .groupBy(secondKey).reduceWindow(â¦).flatten() ~~~ The above call would create global windows of 1000 elements group it by the first key and then apply a mapWindow transformation. The resulting windowed stream will then be grouped by the second key and further reduced. The results of the reduce transformation are then flattened. -Notice here that we only defined the window size once at the beginning of the transformation. This means that anything that happens afterwards (`.groupBy(firstKey).mapWindow(â¦).groupBy(secondKey).reduceWindow(â¦)`) happens inside the 1000 element windows. Of course the mapWindow might reduce the number of elements but the idea is that each transformation still corresponds to the same 1000 elements in the original stream. +Notice here that we only defined the window size once at the beginning of the transformation. This means that anything that happens afterwards (`.groupBy(firstKey).mapWindow(â¦).groupBy(secondKey).reduceWindow(â¦)`) happens inside the 1000 element windows. Of course the mapWindow might reduce the number of elements but the key idea is that each transformation still corresponds to the same 1000 elements in the original stream. #### Global vs local discretisation -By default all window discretisation calls (`dataStream.window(â¦)`) define global windows meaning that a global window of count 100 will contain the last 100 elements arrived at the discretisation operator in order. In most cases (except for Time) this means that the operator doing the actual discretisation needs to have a degree of parallelism of 1 to be able to correctly execute the discretisation logic. - -Sometimes it is enough to create local discretisations, which allows the discretiser to run in parallel and apply the given discretisation logic at every discretiser instance. To allow local discretisation use the `.local()` method of the windowed data stream. - -Example: - -~~~java -dataStream.window(Count.of(100)).maxBy(field); -~~~ - -This would create global windows of 100 elements (Count discretises with parallelism of 1) and return the record with the max value by the selected field, while +By default all window discretisation calls (`dataStream.window(â¦)`) define global windows meaning that a global window of count 100 will contain the last 100 elements arrived at the discretisation operator in order. In most cases (except for Time) this means that the operator doing the actual discretisation needs to have a degree of parallelism of 1 to be able to correctly execute the discretisation logic. -~~~java -dataStream.window(Count.of(100)).local().maxBy(field); -~~~ +Sometimes it is sufficient to create local discretisations, which allows the discretiser to run in parallel and apply the given discretisation logic at every discretiser instance. To allow local discretisation use the `local()` method of the windowed data stream. -this would create several count discretisers (as defined by the environment parallelism) and compute the max values accordingly. +For example `dataStream.window(Count.of(100)).maxBy(field)` would create global windows of 100 elements (Count discretises with parallelism of 1) and return the record with the max value by the selected field, alternatively the `dataStream.window(Count.of(100)).local().maxBy(field)` would create several count discretisers (as defined by the environment parallelism) and compute the max values accordingly. ### Temporal database style operators @@ -432,9 +420,9 @@ The following code shows a default Join transformation using field position keys ~~~java dataStream1.join(dataStream2) - .onWindow(windowing_params) - .where(key_in_first) - .equalTo(key_in_second); + .onWindow(windowing_params) + .where(key_in_first) + .equalTo(key_in_second); ~~~ The Cross transformation combines two DataStreams into one DataStreams. It builds all pairwise combinations of the elements of both input DataStreams in the current window, i.e., it builds a temporal Cartesian product. @@ -527,7 +515,7 @@ For example to split even and odd numbers: ~~~java @Override -Iterable<String> select(Integer value) { +Iterable<String> select(Integer value) { List<String> outputs = new ArrayList<String>(); @@ -622,7 +610,7 @@ For a detailed Java 8 Guide please refer to the [Java 8 Programming Guide](java8 ~~~java SplitDataStream<Integer> split = someDataStream - .split(x -> Arrays.asList(String.valueOf(x % 2))); + .split(x -> Arrays.asList(String.valueOf(x % 2))); ~~~ Operator Settings @@ -646,7 +634,7 @@ env.setBufferTimeout(timeoutMillis); env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); ~~~ -To maximise the throughput the user can call `.setBufferTimeout(-1)` which will remove the timeout and buffers will only be flushed when they are full. +To maximise the throughput the user can call `setBufferTimeout(-1)` which will remove the timeout and buffers will only be flushed when they are full. To minimise latency, set the timeout to a value close to 0 (fro example 5 or 10 ms). Theoretically a buffer timeout of 0 will cause all outputs to be flushed when produced, but this setting should be avoided because it can cause severe performance degradation. @@ -673,7 +661,7 @@ This connector provides access to data streams from [Apache Kafka](https://kafka #### Kafka Source A class providing an interface for receiving data from Kafka. -The followings have to be provided for the `KafkaSource(..)` constructor in order: +The followings have to be provided for the `KafkaSource(â¦)` constructor in order: 1. The hostname 2. The group name @@ -895,7 +883,7 @@ After installing Docker an image can be pulled for each connector. Containers ca For the easiest set up create a jar with all the dependencies of the *flink-streaming-connectors* project. ~~~batch -cd /PATH/TO/GIT/incubator-flink/flink-addons/flink-streaming-connectors +cd /PATH/TO/GIT/flink/flink-staging/flink-streaming-connectors mvn assembly:assembly ~~~batch @@ -1012,14 +1000,14 @@ Now a terminal started running from the image with all the necessary configurati To have the latest version of Flink type: ~~~batch -cd /git/incubator-flink/ +cd /git/flink/ git pull ~~~ Then build the code with: ~~~batch -cd /git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/ +cd /git/flink/flink-staging/flink-streaming/flink-streaming-connectors/ mvn install -DskipTests ~~~ @@ -1047,4 +1035,4 @@ In the example there are to connectors. One that sends messages to Flume and one <DATE> INFO flume.FlumeTopology: String: <five> arrived from Flume ~~~ -[Back to top](#top) \ No newline at end of file +[Back to top](#top)