[ 
https://issues.apache.org/jira/browse/FLINK-10068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585912#comment-16585912
 ] 

ASF GitHub Bot commented on FLINK-10068:
----------------------------------------

asfgit closed pull request #6504: [FLINK-10068][docs] Add documentation for 
RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/stream/operators/process_function.md 
b/docs/dev/stream/operators/process_function.md
index 4f36721adc7..b2a373eaa5f 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are 
internally maintained
 
 The `TimerService` deduplicates timers per key and timestamp, i.e., there is 
at most one timer per key and timestamp. If multiple timers are registered for 
the same timestamp, the `onTimer()` method will be called just once.
 
-**Note:** Flink synchronizes invocations of `onTimer()` and 
`processElement()`. Hence, users do not have to worry about concurrent 
modification of state.
+<span class="label label-info">Note</span> Flink synchronizes invocations of 
`onTimer()` and `processElement()`. Hence, users do not have to worry about 
concurrent modification of state.
 
 ### Fault Tolerance
 
 Timers are fault tolerant and checkpointed along with the state of the 
application. 
 In case of a failure recovery or when starting an application from a 
savepoint, the timers are restored.
 
-**Note:** Checkpointed processing-time timers that were supposed to fire 
before their restoration, will fire immediately. 
+<span class="label label-info">Note</span> Checkpointed processing-time timers 
that were supposed to fire before their restoration, will fire immediately.
 This might happen when an application recovers from a failure or when it is 
started from a savepoint.
 
-**Note:** Timers are always synchronously checkpointed, regardless of the 
configuration of the state backends. 
-Therefore, a large number of timers can significantly increase checkpointing 
time. 
-See the "Timer Coalescing" section for advice on how to reduce the number of 
timers.
+<span class="label label-info">Note</span> Timers are always asynchronously 
checkpointed, except for the combination of RocksDB backend / with incremental 
snapshots / with heap-based timers (will be resolved with `FLINK-10026`).
+Notice that large numbers of timers can increase the checkpointing time 
because timers are part of the checkpointed state. See the "Timer Coalescing" 
section for advice on how to reduce the number of timers.
 
 ### Timer Coalescing
 
@@ -333,3 +332,43 @@ ctx.timerService.registerEventTimeTimer(coalescedTime)
 {% endhighlight %}
 </div>
 </div>
+
+Timers can also be stopped and removed as follows:
+
+Stopping a processing-time timer:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+long timestampOfTimerToStop = ...
+ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val timestampOfTimerToStop = ...
+ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
+{% endhighlight %}
+</div>
+</div>
+
+Stopping an event-time timer:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+long timestampOfTimerToStop = ...
+ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val timestampOfTimerToStop = ...
+ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-info">Note</span> Stopping a timer has no effect if 
no such timer with the given timestamp is registered.
diff --git a/docs/ops/state/large_state_tuning.md 
b/docs/ops/state/large_state_tuning.md
index 6df551f32da..62b3ee557f6 100644
--- a/docs/ops/state/large_state_tuning.md
+++ b/docs/ops/state/large_state_tuning.md
@@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate 
a `RocksDBStateBackend
         new RocksDBStateBackend(filebackend, true);
 {% endhighlight %}
 
+**RocksDB Timers**
+
+For RocksDB, a user can chose whether timers are stored on the heap (default) 
or inside RocksDB. Heap-based timers can have a better performance for smaller 
numbers of
+timers, while storing timers inside RocksDB offers higher scalability as the 
number of timers in RocksDB can exceed the available main memory (spilling to 
disk).
+
+When using RockDB as state backend, the type of timer storage can be selected 
through Flink's configuration via option key 
`state.backend.rocksdb.timer-service.factory`.
+Possible choices are `heap` (to store timers on the heap, default) and 
`rocksdb` (to store timers in RocksDB).
+
+<span class="label label-info">Note</span> *The combination RocksDB state 
backend / with incremental checkpoint / with heap-based timers currently does 
NOT support asynchronous snapshots for the timers state.
+Other state like keyed state is still snapshotted asynchronously. Please note 
that this is not a regression from previous versions and will be resolved with 
`FLINK-10026`.*
+
 **Passing Options to RocksDB**
 
 {% highlight java %}
@@ -177,11 +188,10 @@ Flink provides some predefined collections of option for 
RocksDB for different s
 We expect to accumulate more such profiles over time. Feel free to contribute 
such predefined option profiles when you
 found a set of options that work well and seem representative for certain 
workloads.
 
-**Important:** RocksDB is a native library, whose allocated memory not from 
the JVM, but directly from the process'
-native memory. Any memory you assign to RocksDB will have to be accounted for, 
typically by decreasing the JVM heap size
+<span class="label label-info">Note</span> RocksDB is a native library that 
allocates memory directly from the process,
+and not from the JVM. Any memory you assign to RocksDB will have to be 
accounted for, typically by decreasing the JVM heap size
 of the TaskManagers by the same amount. Not doing that may result in 
YARN/Mesos/etc terminating the JVM processes for
-allocating more memory than configures.
-
+allocating more memory than configured.
 
 ## Capacity Planning
 
@@ -231,7 +241,7 @@ Compression can be activated through the `ExecutionConfig`:
                executionConfig.setUseSnapshotCompression(true);
 {% endhighlight %}
 
-**Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
+<span class="label label-info">Note</span> The compression option has no 
impact on incremental snapshots, because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
 ## Task-Local Recovery
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index 3d4ce586759..4e09b345e10 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -116,7 +116,8 @@ The RocksDBStateBackend is encouraged for:
 Note that the amount of state that you can keep is only limited by the amount 
of disk space available.
 This allows keeping very large state, compared to the FsStateBackend that 
keeps state in memory.
 This also means, however, that the maximum throughput that can be achieved 
will be lower with
-this state backend.
+this state backend. All reads/writes from/to this backend have to go through 
de-/serialization to retrieve/store the state objects, which is also more 
expensive than always working with the
+on-heap representation as the heap-based backends are doing.
 
 RocksDBStateBackend is currently the only backend that offers incremental 
checkpoints (see [here](large_state_tuning.html)). 
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add documentation for async/RocksDB-based timers
> ------------------------------------------------
>
>                 Key: FLINK-10068
>                 URL: https://issues.apache.org/jira/browse/FLINK-10068
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>
> Documentation how to activate RocksDB based timers, and update that 
> snapshotting now works async, expect for heap-timers + 
> rocks-incremental-snapshot).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to