Repository: storm Updated Branches: refs/heads/master 98dbdcdb5 -> d5b80fcca
STORM-1652 Added trident windowing API documentation to Trident API doc. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/60f5dfbc Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/60f5dfbc Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/60f5dfbc Branch: refs/heads/master Commit: 60f5dfbc56d293325472459e16cec537d59192a3 Parents: 31db7dc Author: Satish Duggana <sdugg...@hortonworks.com> Authored: Mon Mar 28 12:48:40 2016 +0530 Committer: Satish Duggana <sdugg...@hortonworks.com> Committed: Mon Mar 28 15:09:25 2016 +0530 ---------------------------------------------------------------------- docs/Trident-API-Overview.md | 100 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/60f5dfbc/docs/Trident-API-Overview.md ---------------------------------------------------------------------- diff --git a/docs/Trident-API-Overview.md b/docs/Trident-API-Overview.md index d5893cc..5a0ffbc 100644 --- a/docs/Trident-API-Overview.md +++ b/docs/Trident-API-Overview.md @@ -299,6 +299,106 @@ Below example shows how these APIs can be used to find maximum using respective Example applications of these APIs can be located at [TridentMinMaxOfDevicesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java) and [TridentMinMaxOfVehiclesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java) +### Windowing +Trident streams can process tuples in batches which are of the same window and emit aggregated result to the next operation. +There are two kinds of windowing supported which are based on processing time or tuples count: + 1. Tumbling window + 2. Sliding window + +#### Tumbling window +Tuples are grouped in a single window based on processing time or count. Any tuple belongs to only one of the windows. + +```java + + /** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + */ + public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields); + + /** + * Returns a stream of tuples which are aggregated results of a window that tumbles at duration of {@code windowDuration} + */ + public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields); + +``` + +#### Sliding window +Tuples are grouped in windows and window slides for every sliding interval. A tuple can belong to more than one window. + +```java + + /** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window after {@code slideCount}. + */ + public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields); + + /** + * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slidingInterval} + * and completes a window at {@code windowDuration} + */ + public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, + WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields); +``` + +Examples of tumbling and sliding windows can be found [here](Windowing.html) + +#### Common windowing API +Below is the common windowing API which takes `WindowConfig` for any supported windowing configurations. + +```java + + public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, + Aggregator aggregator, Fields functionFields) + +``` + +`windowConfig` can be any of the below. + - `SlidingCountWindow.of(int windowCount, int slidingCount)` + - `SlidingDurationWindow.of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration)` + - `TumblingCountWindow.of(int windowLength)` + - `TumblingDurationWindow.of(BaseWindowedBolt.Duration windowLength)` + + +Trident windowing APIs need `WindowsStoreFactory` to store received tuples and aggregated values. Currently, basic implementation +for HBase is given with `HBaseWindowsStoreFactory`. It can further be extended to address respective usecases. +Example of using `HBaseWindowStoreFactory` for windowing can be seen below. + +```java + + // window-state table should already be created with cf:tuples column + HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8")); + FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), + new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), + new Values("how many apples can you eat"), new Values("to be or not to be the person")); + spout.setCycle(true); + + TridentTopology topology = new TridentTopology(); + + Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), + new Split(), new Fields("word")) + .window(TumblingCountWindow.of(1000), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count")) + .peek(new Consumer() { + @Override + public void accept(TridentTuple input) { + LOG.info("Received tuple: [{}]", input); + } + }); + + StormTopology stormTopology = topology.build(); + +``` + +Detailed description of all the above APIs in this section can be found [here](javadocs/org/apache/storm/trident/Stream.html) + +#### Example applications +Example applications of these APIs are located at [TridentHBaseWindowingStoreTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java) +and [TridentWindowingInmemoryStoreTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java) + + ### partitionAggregate partitionAggregate runs a function on each partition of a batch of tuples. Unlike functions, the tuples emitted by partitionAggregate replace the input tuples given to it. Consider this example: