Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2218#discussion_r128156078
  
    --- Diff: docs/Windowing.md ---
    @@ -266,3 +266,105 @@ tuples can be received within the timeout period.
     An example toplogy `SlidingWindowTopology` shows how to use the apis to 
compute a sliding window sum and a tumbling window 
     average.
     
    +## Stateful windowing
    +The default windowing implementation in storm stores the tuples in memory 
until they are processed and expired from the 
    +window. This limits the use cases to windows that
    +fit entirely in memory. Also the source tuples cannot be ack-ed until the 
window expiry requiring large message timeouts
    +(topology.message.timeout.secs should be larger than the window length + 
sliding interval). This also puts extra loads 
    +due to the complex acking and anchoring requirements.
    + 
    +To address the above limitations and to support larger window sizes, storm 
provides stateful windowing support via `IStatefulWindowedBolt`. 
    +User bolts should typically extend `BaseStatefulWindowedBolt` for the 
windowing operations with the framework automatically 
    +managing the state of the window in the background.
    +
    +If the sources provide a monotonically increasing identifier as a part of 
the message, the framework can use this
    +to periodically checkpoint the last expired and evaluated message ids, to 
avoid duplicate window evaluations in case of 
    +failures or restarts. During recovery, the tuples with message ids lower 
than last expired id are discarded and tuples with 
    +message id between the last expired and last evaluated message ids are fed 
into the system without activating any triggers. 
    +The tuples beyond the last evaluated message ids are processed as usual. 
This can be enabled by setting
    +the `messageIdField` as shown below,
    +
    +```java
    +topologyBuilder.setBolt("mybolt",
    +                   new MyStatefulWindowedBolt()
    +                   .withWindow(...) // windowing configuarations
    +                   .withMessageIdField("msgid"), // a monotonically 
increasing 'long' field in the tuple
    +                   parallelism)
    +               .shuffleGrouping("spout");
    +```
    +
    +However, this option is feasible only if the sources can provide a 
monotonically increasing identifier in the tuple and the same is maintained
    +while re-emitting the messages in case of failures. Here the tuples in 
window are still held in memory.
    +
    +For more details take a look at the sample topology in storm starter 
`StatefulWindowingTopology` which will help you get started.
    +
    +### Window checkpointing
    +
    +With window checkpointing, the monotonically increasing id is no-longer 
required since the framework transparently saves the state of the window 
periodically into the configured state backend.
    +The state that is saved includes the tuples in the window, any system 
state that is required to recover the state of processing
    +and also the user state.
    +
    +```java
    +topologyBuilder.setBolt("mybolt",
    +                   new MyStatefulPersistentWindowedBolt()
    +                   .withWindow(...) // windowing configuarations
    +                   .withPersistence() // persist the window state
    +                   .withMaxEventsInMemory(25000), // max number of events 
to be cached in memory
    +                    parallelism)
    +               .shuffleGrouping("spout");
    +
    +```
    +
    +The `withPersistence` instructs the framework to transparently save the 
tuples in window along with
    +any associated system and user state to the state backend. The 
`withMaxEventsInMemory` is an optional 
    +configuration that specifies the maximum number of tuples that may be kept 
in memory. The tuples are transparently loaded from 
    +the state backend as required and the most recently used tuples are 
retained in memory (backed by a LRU cache).
    --- End diff --
    
    By 'used' I meant accessed. This could be the latest partition where the 
events are being added or the partitions that were accessed recently during 
window activation. Yes, its true that the older partitions needs to be accessed 
before the recent ones while iterating the window. Right now we invalidate any 
empty partitions during checkpointing. I will see how we can further optimize 
to keep the older partitions (queue head) cached.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to