Repository: storm
Updated Branches:
  refs/heads/master 98dbdcdb5 -> d5b80fcca


STORM-1652 Added trident windowing API documentation to Trident API doc.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/60f5dfbc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/60f5dfbc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/60f5dfbc

Branch: refs/heads/master
Commit: 60f5dfbc56d293325472459e16cec537d59192a3
Parents: 31db7dc
Author: Satish Duggana <sdugg...@hortonworks.com>
Authored: Mon Mar 28 12:48:40 2016 +0530
Committer: Satish Duggana <sdugg...@hortonworks.com>
Committed: Mon Mar 28 15:09:25 2016 +0530

----------------------------------------------------------------------
 docs/Trident-API-Overview.md | 100 ++++++++++++++++++++++++++++++++++++++
 1 file changed, 100 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/60f5dfbc/docs/Trident-API-Overview.md
----------------------------------------------------------------------
diff --git a/docs/Trident-API-Overview.md b/docs/Trident-API-Overview.md
index d5893cc..5a0ffbc 100644
--- a/docs/Trident-API-Overview.md
+++ b/docs/Trident-API-Overview.md
@@ -299,6 +299,106 @@ Below example shows how these APIs can be used to find 
maximum using respective
 
 Example applications of these APIs can be located at 
[TridentMinMaxOfDevicesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java)
 and 
[TridentMinMaxOfVehiclesTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java)
 
 
+### Windowing
+Trident streams can process tuples in batches which are of the same window and 
emit aggregated result to the next operation. 
+There are two kinds of windowing supported which are based on processing time 
or tuples count:
+    1. Tumbling window
+    2. Sliding window
+
+#### Tumbling window
+Tuples are grouped in a single window based on processing time or count. Any 
tuple belongs to only one of the windows.
+
+```java 
+
+    /**
+     * Returns a stream of tuples which are aggregated results of a tumbling 
window with every {@code windowCount} of tuples.
+     */
+    public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
+                                      Fields inputFields, Aggregator 
aggregator, Fields functionFields);
+    
+    /**
+     * Returns a stream of tuples which are aggregated results of a window 
that tumbles at duration of {@code windowDuration}
+     */
+    public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, 
WindowsStoreFactory windowStoreFactory,
+                                     Fields inputFields, Aggregator 
aggregator, Fields functionFields);
+                                     
+```
+
+#### Sliding window
+Tuples are grouped in windows and window slides for every sliding interval. A 
tuple can belong to more than one window.
+
+```java
+ 
+    /**
+     * Returns a stream of tuples which are aggregated results of a sliding 
window with every {@code windowCount} of tuples
+     * and slides the window after {@code slideCount}.
+     */
+    public Stream slidingWindow(int windowCount, int slideCount, 
WindowsStoreFactory windowStoreFactory,
+                                      Fields inputFields, Aggregator 
aggregator, Fields functionFields);
+     
+    /**
+     * Returns a stream of tuples which are aggregated results of a window 
which slides at duration of {@code slidingInterval}
+     * and completes a window at {@code windowDuration}
+     */
+    public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, 
BaseWindowedBolt.Duration slidingInterval,
+                                    WindowsStoreFactory windowStoreFactory, 
Fields inputFields, Aggregator aggregator, Fields functionFields);
+```
+
+Examples of tumbling and sliding windows can be found [here](Windowing.html)
+
+#### Common windowing API
+Below is the common windowing API which takes `WindowConfig` for any supported 
windowing configurations. 
+
+```java
+
+    public Stream window(WindowConfig windowConfig, WindowsStoreFactory 
windowStoreFactory, Fields inputFields,
+                         Aggregator aggregator, Fields functionFields)
+                         
+```
+
+`windowConfig` can be any of the below.
+ - `SlidingCountWindow.of(int windowCount, int slidingCount)`
+ - `SlidingDurationWindow.of(BaseWindowedBolt.Duration windowDuration, 
BaseWindowedBolt.Duration slidingDuration)`
+ - `TumblingCountWindow.of(int windowLength)`
+ - `TumblingDurationWindow.of(BaseWindowedBolt.Duration windowLength)`
+ 
+ 
+Trident windowing APIs need `WindowsStoreFactory` to store received tuples and 
aggregated values. Currently, basic implementation 
+for HBase is given with `HBaseWindowsStoreFactory`. It can further be extended 
to address respective usecases. 
+Example of using `HBaseWindowStoreFactory` for windowing can be seen below.    
+
+```java
+
+    // window-state table should already be created with cf:tuples column
+    HBaseWindowsStoreFactory windowStoreFactory = new 
HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", 
"cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
+    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new 
Values("the cow jumped over the moon"),
+            new Values("the man went to the store and bought some candy"), new 
Values("four score and seven years ago"),
+            new Values("how many apples can you eat"), new Values("to be or 
not to be the person"));
+    spout.setCycle(true);
+
+    TridentTopology topology = new TridentTopology();
+
+    Stream stream = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
+            new Split(), new Fields("word"))
+            .window(TumblingCountWindow.of(1000), windowStoreFactory, new 
Fields("word"), new CountAsAggregator(), new Fields("count"))
+            .peek(new Consumer() {
+                @Override
+                public void accept(TridentTuple input) {
+                    LOG.info("Received tuple: [{}]", input);
+                }
+            });
+
+    StormTopology stormTopology =  topology.build();
+    
+```
+
+Detailed description of all the above APIs in this section can be found 
[here](javadocs/org/apache/storm/trident/Stream.html)  
+
+#### Example applications
+Example applications of these APIs are located at 
[TridentHBaseWindowingStoreTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java)
 
+and 
[TridentWindowingInmemoryStoreTopology](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java)
 
+
+
 ### partitionAggregate
 
 partitionAggregate runs a function on each partition of a batch of tuples. 
Unlike functions, the tuples emitted by partitionAggregate replace the input 
tuples given to it. Consider this example:

Reply via email to