[
https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15091954#comment-15091954
]
Satish Duggana commented on STORM-676:
--------------------------------------
[~revans2] Pl find my comments below.
The document seems to make since, but it is at a very high level. It says you
have a working prototype for windowing. Could you post something describing
the changes you had to make to the coordinators or how the coordination works
to support the windowing operations.
<Satish> There are no changes done at batch coordinators for this feature as
the chosen approach is at operator/bolt level. Batches are replayed by
coordinators when there are any failures and window operator’s bolt will
receive the failed batches. I have basic prototype but it does not cover all
the details mentioned here. I can start working on this design and raise PR
soon if the chosen approach seems to be in the right direction.
Also an explanation of how the windowed state will be stored would be very
good. I would like to see both the format and how the bolts communicate the
state to the state store.
<Satish> Window processor is an implementation of TridentProcessor. It contains
state-store(its factory is passed with window operations by user) and
window-manager instances. It takes tuples and accumulates in-memory collection.
When a batch is finished, it is added to the window state store and
window-manager. Window manager determines when to trigger or evict based on the
windowing configuration.
window-manager:
It contains a List of tuples containing only batch_id, tuple_index, and
timestamp value but the actual tuple field values are stored in window state
store. Window manager fires triggers/evictors based on the window
configuration. When a trigger is fired it accumulates and keeps the triggered
tuples in store. But the plan is to compute the aggregated value of those
triggered tuples, add it to the store and update the expired tuples of
respective batches from the store.
window state store:
Current representation of tuples : Map of <component_id | task_id | batch_id>
vs <List of trident tuples>
It may be changed to: Map of <component_id | task_id | batch_id | tuple_index>
vs <TridentTuple>
Currently created MapStore with API of
public static interface MapStore<K, V> {
public V get(K k);
public Collection<V> getWithPrefixKey(K k);
public void put(K k, V v);
public boolean remove(K k);
}
But storm.trident.state.map.MapState can be used/enhanced for the same. For
better performance, may need to add range based APIs for get and put.
trigger-state representation:
Currently, Map of <component_id | task_id | ‘$trigger’|trigger_id> vs
Collection of TridentTuples, trigger_id is sequence of integer value.
Value of the above Map can be backtype.storm.tuple.Values instance as
aggregation can be done when a trigger is fired.
When finishBatch is invoked on WindowProcessor then it checks whether there are
any pending triggers to be emitted. These triggered results are emitted to
collector as part of the current batch.
Will there be any aggregation before writing the state?
<Satish> Currently, triggers are not storing the aggregated values. But the
plan is to compute the aggregation and store that.
How will you handle a topology coming down and a new one with a different
parallelism of bolts/spouts coming back up? I would suspect that it should not
be a problem as we can aggregate over the old parallelism + the new
parallelism, but I want to be sure we take that into account.
<Satish> Right. Whenever a bolt is prepared, it gets the current state of that
bolt containing componentId/taskId from window state store and creates
window-manager which comprises windowing data for earlier unfinished tuples and
pending triggers.
> Storm Trident support sliding windows
> -------------------------------------
>
> Key: STORM-676
> URL: https://issues.apache.org/jira/browse/STORM-676
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Sriharsha Chintalapani
> Assignee: Satish Duggana
> Attachments: StormTrident_windowing_support-676.pdf
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)