Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2218#discussion_r129110842
  
    --- Diff: docs/Windowing.md ---
    @@ -266,3 +266,105 @@ tuples can be received within the timeout period.
     An example toplogy `SlidingWindowTopology` shows how to use the apis to 
compute a sliding window sum and a tumbling window 
     average.
     
    +## Stateful windowing
    +The default windowing implementation in storm stores the tuples in memory 
until they are processed and expired from the 
    +window. This limits the use cases to windows that
    +fit entirely in memory. Also the source tuples cannot be ack-ed until the 
window expiry requiring large message timeouts
    +(topology.message.timeout.secs should be larger than the window length + 
sliding interval). This also puts extra loads 
    +due to the complex acking and anchoring requirements.
    + 
    +To address the above limitations and to support larger window sizes, storm 
provides stateful windowing support via `IStatefulWindowedBolt`. 
    +User bolts should typically extend `BaseStatefulWindowedBolt` for the 
windowing operations with the framework automatically 
    +managing the state of the window in the background.
    +
    +If the sources provide a monotonically increasing identifier as a part of 
the message, the framework can use this
    +to periodically checkpoint the last expired and evaluated message ids, to 
avoid duplicate window evaluations in case of 
    +failures or restarts. During recovery, the tuples with message ids lower 
than last expired id are discarded and tuples with 
    +message id between the last expired and last evaluated message ids are fed 
into the system without activating any triggers. 
    +The tuples beyond the last evaluated message ids are processed as usual. 
This can be enabled by setting
    +the `messageIdField` as shown below,
    +
    +```java
    +topologyBuilder.setBolt("mybolt",
    +                   new MyStatefulWindowedBolt()
    +                   .withWindow(...) // windowing configuarations
    +                   .withMessageIdField("msgid"), // a monotonically 
increasing 'long' field in the tuple
    +                   parallelism)
    +               .shuffleGrouping("spout");
    +```
    +
    +However, this option is feasible only if the sources can provide a 
monotonically increasing identifier in the tuple and the same is maintained
    +while re-emitting the messages in case of failures. Here the tuples in 
window are still held in memory.
    +
    +For more details take a look at the sample topology in storm starter 
`StatefulWindowingTopology` which will help you get started.
    +
    +### Window checkpointing
    +
    +With window checkpointing, the monotonically increasing id is no-longer 
required since the framework transparently saves the state of the window 
periodically into the configured state backend.
    +The state that is saved includes the tuples in the window, any system 
state that is required to recover the state of processing
    +and also the user state.
    +
    +```java
    +topologyBuilder.setBolt("mybolt",
    +                   new MyStatefulPersistentWindowedBolt()
    +                   .withWindow(...) // windowing configuarations
    +                   .withPersistence() // persist the window state
    +                   .withMaxEventsInMemory(25000), // max number of events 
to be cached in memory
    +                    parallelism)
    +               .shuffleGrouping("spout");
    +
    +```
    +
    +The `withPersistence` instructs the framework to transparently save the 
tuples in window along with
    +any associated system and user state to the state backend. The 
`withMaxEventsInMemory` is an optional 
    +configuration that specifies the maximum number of tuples that may be kept 
in memory. The tuples are transparently loaded from 
    +the state backend as required and the ones that are most likely to be used 
again are retained in memory.
    +
    +The state backend can be configured by setting the topology state provider 
config,
    +
    +```java
    +// use redis for state persistence
    +conf.put(Config.TOPOLOGY_STATE_PROVIDER, 
"org.apache.storm.redis.state.RedisKeyValueStateProvider");
    +
    +```
    +Currently storm supports Redis and HBase as state backends and uses the 
underlying state-checkpointing
    +framework for saving the window state. For more details on state 
checkpointing see [State-checkpointing.md](State-checkpointing.md)
    +
    +Here is an example of a persistent windowed bolt that uses the window 
checkpointing to save its state. The `initState`
    +is invoked with the last saved state (user state) at initialization time. 
The execute method is invoked based on the configured
    +windowing parameters and the tuples in the active window can be accessed 
via an `iterator` as shown below.
    +
    +```java
    +public class MyStatefulPersistentWindowedBolt extends 
BaseStatefulWindowedBolt<K, V> {
    +  private KeyValueState<K, V> state;
    +  
    +  @Override
    +  public void initState(KeyValueState<K, V> state) {
    +    this.state = state;
    +   // ...
    +   // restore the state from the last saved state.
    +   // ...
    +  }
    +  
    +  @Override
    +  public void execute(TupleWindow window) {      
    +    // iterate over tuples in the current window
    +    Iterator<Tuple> it = window.getIter();
    +    while (it.hasNext()) {
    +        // compute some result based on the tuples in window
    +    }
    +    
    +    // possibly update any state to be maintained across windows
    +    state.put(STATE_KEY, updatedValue);
    +    
    +    // emit the results downstream
    +    collector.emit(new Values(result));
    +  }
    +}
    +```
    +
    +Note: In case of persistent windowed bolts, use `TupleWindow.getIter` to 
retrieve an iterator over the 
    +events in the window. If the number of tuples in windows are huge, 
invoking `TupleWindow.get` would
    +try to load all the tuples into memory and may throw an OOM exception.
    +
    +For more details take a look at the sample topology in storm starter 
`PersistentWindowingTopology` which will help you get started.
    --- End diff --
    
    Nit: storm-starter, and maybe link


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to