This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b456cca8c92adf62e7e58a495f9692b2c9c2cb96
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Mon Aug 6 14:54:04 2018 +0200

    [FLINK-10068][docs] Add documentation for RocksDB-based timers and stopping 
timers
    
    This closes #6504.
---
 docs/dev/stream/operators/process_function.md | 49 ++++++++++++++++++++++++---
 docs/ops/state/large_state_tuning.md          | 20 ++++++++---
 docs/ops/state/state_backends.md              |  3 +-
 3 files changed, 61 insertions(+), 11 deletions(-)

diff --git a/docs/dev/stream/operators/process_function.md 
b/docs/dev/stream/operators/process_function.md
index 4f36721..b2a373e 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are 
internally maintained
 
 The `TimerService` deduplicates timers per key and timestamp, i.e., there is 
at most one timer per key and timestamp. If multiple timers are registered for 
the same timestamp, the `onTimer()` method will be called just once.
 
-**Note:** Flink synchronizes invocations of `onTimer()` and 
`processElement()`. Hence, users do not have to worry about concurrent 
modification of state.
+<span class="label label-info">Note</span> Flink synchronizes invocations of 
`onTimer()` and `processElement()`. Hence, users do not have to worry about 
concurrent modification of state.
 
 ### Fault Tolerance
 
 Timers are fault tolerant and checkpointed along with the state of the 
application. 
 In case of a failure recovery or when starting an application from a 
savepoint, the timers are restored.
 
-**Note:** Checkpointed processing-time timers that were supposed to fire 
before their restoration, will fire immediately. 
+<span class="label label-info">Note</span> Checkpointed processing-time timers 
that were supposed to fire before their restoration, will fire immediately.
 This might happen when an application recovers from a failure or when it is 
started from a savepoint.
 
-**Note:** Timers are always synchronously checkpointed, regardless of the 
configuration of the state backends. 
-Therefore, a large number of timers can significantly increase checkpointing 
time. 
-See the "Timer Coalescing" section for advice on how to reduce the number of 
timers.
+<span class="label label-info">Note</span> Timers are always asynchronously 
checkpointed, except for the combination of RocksDB backend / with incremental 
snapshots / with heap-based timers (will be resolved with `FLINK-10026`).
+Notice that large numbers of timers can increase the checkpointing time 
because timers are part of the checkpointed state. See the "Timer Coalescing" 
section for advice on how to reduce the number of timers.
 
 ### Timer Coalescing
 
@@ -333,3 +332,43 @@ ctx.timerService.registerEventTimeTimer(coalescedTime)
 {% endhighlight %}
 </div>
 </div>
+
+Timers can also be stopped and removed as follows:
+
+Stopping a processing-time timer:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+long timestampOfTimerToStop = ...
+ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val timestampOfTimerToStop = ...
+ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
+{% endhighlight %}
+</div>
+</div>
+
+Stopping an event-time timer:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+long timestampOfTimerToStop = ...
+ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val timestampOfTimerToStop = ...
+ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-info">Note</span> Stopping a timer has no effect if 
no such timer with the given timestamp is registered.
diff --git a/docs/ops/state/large_state_tuning.md 
b/docs/ops/state/large_state_tuning.md
index 6df551f..62b3ee5 100644
--- a/docs/ops/state/large_state_tuning.md
+++ b/docs/ops/state/large_state_tuning.md
@@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate 
a `RocksDBStateBackend
         new RocksDBStateBackend(filebackend, true);
 {% endhighlight %}
 
+**RocksDB Timers**
+
+For RocksDB, a user can chose whether timers are stored on the heap (default) 
or inside RocksDB. Heap-based timers can have a better performance for smaller 
numbers of
+timers, while storing timers inside RocksDB offers higher scalability as the 
number of timers in RocksDB can exceed the available main memory (spilling to 
disk).
+
+When using RockDB as state backend, the type of timer storage can be selected 
through Flink's configuration via option key 
`state.backend.rocksdb.timer-service.factory`.
+Possible choices are `heap` (to store timers on the heap, default) and 
`rocksdb` (to store timers in RocksDB).
+
+<span class="label label-info">Note</span> *The combination RocksDB state 
backend / with incremental checkpoint / with heap-based timers currently does 
NOT support asynchronous snapshots for the timers state.
+Other state like keyed state is still snapshotted asynchronously. Please note 
that this is not a regression from previous versions and will be resolved with 
`FLINK-10026`.*
+
 **Passing Options to RocksDB**
 
 {% highlight java %}
@@ -177,11 +188,10 @@ Flink provides some predefined collections of option for 
RocksDB for different s
 We expect to accumulate more such profiles over time. Feel free to contribute 
such predefined option profiles when you
 found a set of options that work well and seem representative for certain 
workloads.
 
-**Important:** RocksDB is a native library, whose allocated memory not from 
the JVM, but directly from the process'
-native memory. Any memory you assign to RocksDB will have to be accounted for, 
typically by decreasing the JVM heap size
+<span class="label label-info">Note</span> RocksDB is a native library that 
allocates memory directly from the process,
+and not from the JVM. Any memory you assign to RocksDB will have to be 
accounted for, typically by decreasing the JVM heap size
 of the TaskManagers by the same amount. Not doing that may result in 
YARN/Mesos/etc terminating the JVM processes for
-allocating more memory than configures.
-
+allocating more memory than configured.
 
 ## Capacity Planning
 
@@ -231,7 +241,7 @@ Compression can be activated through the `ExecutionConfig`:
                executionConfig.setUseSnapshotCompression(true);
 {% endhighlight %}
 
-**Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
+<span class="label label-info">Note</span> The compression option has no 
impact on incremental snapshots, because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
 ## Task-Local Recovery
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index 3d4ce58..4e09b34 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -116,7 +116,8 @@ The RocksDBStateBackend is encouraged for:
 Note that the amount of state that you can keep is only limited by the amount 
of disk space available.
 This allows keeping very large state, compared to the FsStateBackend that 
keeps state in memory.
 This also means, however, that the maximum throughput that can be achieved 
will be lower with
-this state backend.
+this state backend. All reads/writes from/to this backend have to go through 
de-/serialization to retrieve/store the state objects, which is also more 
expensive than always working with the
+on-heap representation as the heap-based backends are doing.
 
 RocksDBStateBackend is currently the only backend that offers incremental 
checkpoints (see [here](large_state_tuning.html)). 
 

Reply via email to