[
https://issues.apache.org/jira/browse/FLINK-10068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585912#comment-16585912
]
ASF GitHub Bot commented on FLINK-10068:
----------------------------------------
asfgit closed pull request #6504: [FLINK-10068][docs] Add documentation for
RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/dev/stream/operators/process_function.md
b/docs/dev/stream/operators/process_function.md
index 4f36721adc7..b2a373eaa5f 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are
internally maintained
The `TimerService` deduplicates timers per key and timestamp, i.e., there is
at most one timer per key and timestamp. If multiple timers are registered for
the same timestamp, the `onTimer()` method will be called just once.
-**Note:** Flink synchronizes invocations of `onTimer()` and
`processElement()`. Hence, users do not have to worry about concurrent
modification of state.
+<span class="label label-info">Note</span> Flink synchronizes invocations of
`onTimer()` and `processElement()`. Hence, users do not have to worry about
concurrent modification of state.
### Fault Tolerance
Timers are fault tolerant and checkpointed along with the state of the
application.
In case of a failure recovery or when starting an application from a
savepoint, the timers are restored.
-**Note:** Checkpointed processing-time timers that were supposed to fire
before their restoration, will fire immediately.
+<span class="label label-info">Note</span> Checkpointed processing-time timers
that were supposed to fire before their restoration, will fire immediately.
This might happen when an application recovers from a failure or when it is
started from a savepoint.
-**Note:** Timers are always synchronously checkpointed, regardless of the
configuration of the state backends.
-Therefore, a large number of timers can significantly increase checkpointing
time.
-See the "Timer Coalescing" section for advice on how to reduce the number of
timers.
+<span class="label label-info">Note</span> Timers are always asynchronously
checkpointed, except for the combination of RocksDB backend / with incremental
snapshots / with heap-based timers (will be resolved with `FLINK-10026`).
+Notice that large numbers of timers can increase the checkpointing time
because timers are part of the checkpointed state. See the "Timer Coalescing"
section for advice on how to reduce the number of timers.
### Timer Coalescing
@@ -333,3 +332,43 @@ ctx.timerService.registerEventTimeTimer(coalescedTime)
{% endhighlight %}
</div>
</div>
+
+Timers can also be stopped and removed as follows:
+
+Stopping a processing-time timer:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+long timestampOfTimerToStop = ...
+ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val timestampOfTimerToStop = ...
+ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
+{% endhighlight %}
+</div>
+</div>
+
+Stopping an event-time timer:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+long timestampOfTimerToStop = ...
+ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val timestampOfTimerToStop = ...
+ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-info">Note</span> Stopping a timer has no effect if
no such timer with the given timestamp is registered.
diff --git a/docs/ops/state/large_state_tuning.md
b/docs/ops/state/large_state_tuning.md
index 6df551f32da..62b3ee557f6 100644
--- a/docs/ops/state/large_state_tuning.md
+++ b/docs/ops/state/large_state_tuning.md
@@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate
a `RocksDBStateBackend
new RocksDBStateBackend(filebackend, true);
{% endhighlight %}
+**RocksDB Timers**
+
+For RocksDB, a user can chose whether timers are stored on the heap (default)
or inside RocksDB. Heap-based timers can have a better performance for smaller
numbers of
+timers, while storing timers inside RocksDB offers higher scalability as the
number of timers in RocksDB can exceed the available main memory (spilling to
disk).
+
+When using RockDB as state backend, the type of timer storage can be selected
through Flink's configuration via option key
`state.backend.rocksdb.timer-service.factory`.
+Possible choices are `heap` (to store timers on the heap, default) and
`rocksdb` (to store timers in RocksDB).
+
+<span class="label label-info">Note</span> *The combination RocksDB state
backend / with incremental checkpoint / with heap-based timers currently does
NOT support asynchronous snapshots for the timers state.
+Other state like keyed state is still snapshotted asynchronously. Please note
that this is not a regression from previous versions and will be resolved with
`FLINK-10026`.*
+
**Passing Options to RocksDB**
{% highlight java %}
@@ -177,11 +188,10 @@ Flink provides some predefined collections of option for
RocksDB for different s
We expect to accumulate more such profiles over time. Feel free to contribute
such predefined option profiles when you
found a set of options that work well and seem representative for certain
workloads.
-**Important:** RocksDB is a native library, whose allocated memory not from
the JVM, but directly from the process'
-native memory. Any memory you assign to RocksDB will have to be accounted for,
typically by decreasing the JVM heap size
+<span class="label label-info">Note</span> RocksDB is a native library that
allocates memory directly from the process,
+and not from the JVM. Any memory you assign to RocksDB will have to be
accounted for, typically by decreasing the JVM heap size
of the TaskManagers by the same amount. Not doing that may result in
YARN/Mesos/etc terminating the JVM processes for
-allocating more memory than configures.
-
+allocating more memory than configured.
## Capacity Planning
@@ -231,7 +241,7 @@ Compression can be activated through the `ExecutionConfig`:
executionConfig.setUseSnapshotCompression(true);
{% endhighlight %}
-**Notice:** The compression option has no impact on incremental snapshots,
because they are using RocksDB's internal
+<span class="label label-info">Note</span> The compression option has no
impact on incremental snapshots, because they are using RocksDB's internal
format which is always using snappy compression out of the box.
## Task-Local Recovery
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index 3d4ce586759..4e09b345e10 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -116,7 +116,8 @@ The RocksDBStateBackend is encouraged for:
Note that the amount of state that you can keep is only limited by the amount
of disk space available.
This allows keeping very large state, compared to the FsStateBackend that
keeps state in memory.
This also means, however, that the maximum throughput that can be achieved
will be lower with
-this state backend.
+this state backend. All reads/writes from/to this backend have to go through
de-/serialization to retrieve/store the state objects, which is also more
expensive than always working with the
+on-heap representation as the heap-based backends are doing.
RocksDBStateBackend is currently the only backend that offers incremental
checkpoints (see [here](large_state_tuning.html)).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add documentation for async/RocksDB-based timers
> ------------------------------------------------
>
> Key: FLINK-10068
> URL: https://issues.apache.org/jira/browse/FLINK-10068
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Labels: pull-request-available
>
> Documentation how to activate RocksDB based timers, and update that
> snapshotting now works async, expect for heap-timers +
> rocks-incremental-snapshot).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)