Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r128289791 --- Diff: docs/Windowing.md --- @@ -266,3 +266,105 @@ tuples can be received within the timeout period. An example toplogy `SlidingWindowTopology` shows how to use the apis to compute a sliding window sum and a tumbling window average. +## Stateful windowing +The default windowing implementation in storm stores the tuples in memory until they are processed and expired from the +window. This limits the use cases to windows that +fit entirely in memory. Also the source tuples cannot be ack-ed until the window expiry requiring large message timeouts +(topology.message.timeout.secs should be larger than the window length + sliding interval). This also puts extra loads +due to the complex acking and anchoring requirements. + +To address the above limitations and to support larger window sizes, storm provides stateful windowing support via `IStatefulWindowedBolt`. +User bolts should typically extend `BaseStatefulWindowedBolt` for the windowing operations with the framework automatically +managing the state of the window in the background. + +If the sources provide a monotonically increasing identifier as a part of the message, the framework can use this +to periodically checkpoint the last expired and evaluated message ids, to avoid duplicate window evaluations in case of +failures or restarts. During recovery, the tuples with message ids lower than last expired id are discarded and tuples with +message id between the last expired and last evaluated message ids are fed into the system without activating any triggers. +The tuples beyond the last evaluated message ids are processed as usual. This can be enabled by setting +the `messageIdField` as shown below, + +```java +topologyBuilder.setBolt("mybolt", + new MyStatefulWindowedBolt() + .withWindow(...) // windowing configuarations + .withMessageIdField("msgid"), // a monotonically increasing 'long' field in the tuple + parallelism) + .shuffleGrouping("spout"); +``` + +However, this option is feasible only if the sources can provide a monotonically increasing identifier in the tuple and the same is maintained +while re-emitting the messages in case of failures. Here the tuples in window are still held in memory. + +For more details take a look at the sample topology in storm starter `StatefulWindowingTopology` which will help you get started. + +### Window checkpointing + +With window checkpointing, the monotonically increasing id is no-longer required since the framework transparently saves the state of the window periodically into the configured state backend. +The state that is saved includes the tuples in the window, any system state that is required to recover the state of processing +and also the user state. + +```java +topologyBuilder.setBolt("mybolt", + new MyStatefulPersistentWindowedBolt() + .withWindow(...) // windowing configuarations + .withPersistence() // persist the window state + .withMaxEventsInMemory(25000), // max number of events to be cached in memory + parallelism) + .shuffleGrouping("spout"); + +``` + +The `withPersistence` instructs the framework to transparently save the tuples in window along with +any associated system and user state to the state backend. The `withMaxEventsInMemory` is an optional +configuration that specifies the maximum number of tuples that may be kept in memory. The tuples are transparently loaded from +the state backend as required and the most recently used tuples are retained in memory (backed by a LRU cache). --- End diff -- @HeartSaVioR, replaced Guava with Caffeine, which has better cache hit rate with sequential access and is based on LFU. I ran some tests with sequential queue scans (one of the typical operations in windowing) and it gave much better results. Caffeine is also api compatible with guava and is already used elsewhere in storm.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---