Repository: flink
Updated Branches:
  refs/heads/release-1.5 2ae48534e -> f083622a2


[FLINK-9107] [docs] Document timer coalescing for ProcessFunction

This closes #5790.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f083622a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f083622a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f083622a

Branch: refs/heads/release-1.5
Commit: f083622a200c79395ecf16e2be6f8b540fe85178
Parents: 2ae4853
Author: Nico Kruber <n...@data-artisans.com>
Authored: Thu Mar 29 16:20:00 2018 +0200
Committer: Timo Walther <twal...@apache.org>
Committed: Wed Apr 4 19:12:08 2018 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/process_function.md | 54 +++++++++++++++++++++-
 1 file changed, 53 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f083622a/docs/dev/stream/operators/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/process_function.md 
b/docs/dev/stream/operators/process_function.md
index d967983..1ed4edf 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -269,4 +269,56 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, 
out: Collector[OUT]):
 }
 {% endhighlight %}
 </div>
-</div>
\ No newline at end of file
+</div>
+
+## Optimizations
+
+### Timer Coalescing
+
+Every timer registered at the `TimerService` via `registerEventTimeTimer()` or
+`registerProcessingTimeTimer()` will be stored on the Java heap and enqueued 
for execution. There is,
+however, a maximum of one timer per key and timestamp at a millisecond 
resolution and thus, in the
+worst case, every key may have a timer for each upcoming millisecond. Even if 
you do not do any
+processing for outdated timers in `onTimer`, this may put a significant burden 
on the
+Flink runtime.
+
+Since there is only one timer per key and timestamp, however, you may coalesce 
timers by reducing the
+timer resolution. For a timer resolution of 1 second (event or processing 
time), for example, you
+can round down the target time to full seconds and therefore allow the timer 
to fire at most 1
+second earlier but not later than with millisecond accuracy. As a result, 
there would be at most
+one timer for each combination of key and timestamp:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
+ctx.timerService().registerProcessingTimeTimer(coalescedTime);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
+ctx.timerService.registerProcessingTimeTimer(coalescedTime)
+{% endhighlight %}
+</div>
+</div>
+
+Since event-time timers only fire with watermarks coming in, you may also 
schedule and coalesce
+these timers with the next watermark by using the current one:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+long coalescedTime = ctx.timerService().currentWatermark() + 1;
+ctx.timerService().registerEventTimeTimer(coalescedTime);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val coalescedTime = ctx.timerService.currentWatermark + 1
+ctx.timerService.registerEventTimeTimer(coalescedTime)
+{% endhighlight %}
+</div>
+</div>

Reply via email to