[ 
https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15091954#comment-15091954
 ] 

Satish Duggana commented on STORM-676:
--------------------------------------

[~revans2] Pl find my comments below.

The document seems to make since, but it is at a very high level.  It says you 
have a working prototype for windowing.  Could you post something describing 
the changes you had to make to the coordinators or how the coordination works 
to support the windowing operations.
<Satish> There are no changes done at batch coordinators for this feature as 
the chosen approach is at operator/bolt level. Batches are replayed by 
coordinators when there are any failures and window operator’s bolt will 
receive the failed batches. I have basic prototype but it does not cover all 
the details mentioned here. I can start working on this design and raise PR 
soon if the chosen approach seems to be in the right direction.

Also an explanation of how the windowed state will be stored would be very 
good. I would like to see both the format and how the bolts communicate the 
state to the state store.
<Satish> Window processor is an implementation of TridentProcessor. It contains 
state-store(its factory is passed with window operations by user) and 
window-manager instances. It takes tuples and accumulates in-memory collection. 
When a batch is finished, it is added to the window state store and 
window-manager. Window manager determines when to trigger or evict based on the 
windowing configuration.

window-manager:
It contains a List of tuples containing only batch_id, tuple_index, and 
timestamp value but the actual tuple field values are stored in window state 
store. Window manager fires triggers/evictors based on the window 
configuration. When a trigger is fired it accumulates and keeps the triggered 
tuples in store. But the plan is to compute the aggregated value of those 
triggered tuples, add it to the store and update the expired tuples of 
respective batches from the store.

window state store:
Current representation of tuples : Map of <component_id | task_id | batch_id> 
vs <List of trident tuples>
It may be changed to: Map of <component_id | task_id | batch_id | tuple_index> 
vs <TridentTuple>

Currently created MapStore with API of
public static interface MapStore<K, V> {
    public V get(K k);
    public Collection<V> getWithPrefixKey(K k);
    public void put(K k, V v);
    public boolean remove(K k);
}
But storm.trident.state.map.MapState can be used/enhanced for the same. For 
better performance, may need to add range based APIs for get and put.

trigger-state representation:
Currently, Map of <component_id | task_id | ‘$trigger’|trigger_id> vs 
Collection of TridentTuples, trigger_id is sequence of integer value.
Value of the above Map can be backtype.storm.tuple.Values instance as 
aggregation can be done when a trigger is fired.

When finishBatch is invoked on WindowProcessor then it checks whether there are 
any pending triggers to be emitted. These triggered results are emitted to 
collector as part of the current batch.

Will there be any aggregation before writing the state?
<Satish> Currently, triggers are not storing the aggregated values. But the 
plan is to compute the aggregation and store that.

How will you handle a topology coming down and a new one with a different 
parallelism of bolts/spouts coming back up?  I would suspect that it should not 
be a problem as we can aggregate over the old parallelism + the new 
parallelism, but I want to be sure we take that into account.
<Satish> Right. Whenever a bolt is prepared, it gets the current state of that 
bolt containing componentId/taskId from window state store and creates 
window-manager which comprises windowing data for earlier unfinished tuples and 
pending triggers.

> Storm Trident support sliding windows
> -------------------------------------
>
>                 Key: STORM-676
>                 URL: https://issues.apache.org/jira/browse/STORM-676
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: Sriharsha Chintalapani
>            Assignee: Satish Duggana
>         Attachments: StormTrident_windowing_support-676.pdf
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to