This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git
The following commit(s) were added to refs/heads/asf-site by this push:
new e11781f4d add low latency techniques blog post part2
e11781f4d is described below
commit e11781f4dd965d0561d1199ae4dd13e7f596afd4
Author: Jun Qin <[email protected]>
AuthorDate: Tue May 17 20:13:28 2022 +0200
add low latency techniques blog post part2
---
_posts/2022-05-23-latency-part2.md | 97 +++++++++++++++++++++
img/blog/2022-05-23-latency-part2/async-io.png | Bin 0 -> 110500 bytes
.../enriching-with-async-io.png | Bin 0 -> 246599 bytes
img/blog/2022-05-23-latency-part2/spread-work.png | Bin 0 -> 153305 bytes
4 files changed, 97 insertions(+)
diff --git a/_posts/2022-05-23-latency-part2.md
b/_posts/2022-05-23-latency-part2.md
new file mode 100644
index 000000000..2a96f1985
--- /dev/null
+++ b/_posts/2022-05-23-latency-part2.md
@@ -0,0 +1,97 @@
+---
+layout: post
+title: "Getting into Low-Latency Gears with Apache Flink - Part Two"
+date: 2022-05-23 00:00:00
+authors:
+- Jun Qin:
+ name: "Jun Qin"
+- Nico Kruber:
+ name: "Nico Kruber"
+excerpt: This multi-part series of blog post presents a collection of
low-latency techniques in Flink. Following with part one, Part two continues
with a few more techniques that optimize latency directly.
+---
+
+This series of blog posts present a collection of low-latency techniques in
Flink. In [part one](https://flink.apache.org/2022/05/18/latency-part1.html),
we discussed the types of latency in Flink and the way we measure end-to-end
latency and presented a few techniques that optimize latency directly. In this
post, we will continue with a few more direct latency optimization techniques.
Just like in part one, for each optimization technique, we will clarify what it
is, when to use it, and [...]
+
+
+# Direct latency optimization
+
+## Spread work across time
+
+When you use timers or do windowing in a job, timer or window firing may
create load spikes due to heavy computation or state access. If the allocated
resources cannot cope with these load spikes, timer or window firing will take
a long time to finish. This often results in high latency.
+
+To avoid this situation, you should change your code to spread out the
workload as much as possible such that you do not accumulate too much work to
be done at a single point in time. In the case of windowing, you should
consider using incremental window aggregation with `AggregateFunction` or
`ReduceFunction`. In the case of timers in a `ProcessFunction`, the operations
executed in the `onTimer()` method should be optimized such that the time spent
there is reduced to a minimum. If you [...]
+
+**You can apply this optimization** if you are using timer-based processing
(e.g., timers, windowing) and an efficient aggregation can be applied whenever
an event arrives instead of waiting for timers to fire.
+
+**Keep in mind** that when you spread work across time, you should consider
not only computation but also state access, especially when using RocksDB.
Spreading one type of work while accumulating the other may result in higher
latencies.
+
+[WindowingJob](https://github.com/ververica/lab-flink-latency/blob/main/src/main/java/com/ververica/lablatency/job/WindowingJob.java)
already does incremental window aggregation with `AggregateFunction`. To show
the latency improvement of this technique, we compared
[WindowingJob](https://github.com/ververica/lab-flink-latency/blob/main/src/main/java/com/ververica/lablatency/job/WindowingJob.java)
with a variant that does not do incremental aggregation,
[WindowingJobNoAggregation](https: [...]
+
+
+<center>
+<img vspace="8" style="width:50%"
src="{{site.baseurl}}/img/blog/2022-05-23-latency-part2/spread-work.png" />
+</center>
+
+
+## Access external systems efficiently
+
+### Using async I/O
+
+When interacting with external systems (e.g., RDBMS, object stores, web
services) in a Flink job for data enrichment, the latency in getting responses
from external systems often dominates the overall latency of the job. With
Flink’s [Async I/O
API](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html)
(e.g., `AsyncDataStream.unorderedWait()` or `AsyncDataStream.orderedWait()`),
a single parallel function instance can handle many requests concurrently [...]
+
+<center>
+<img vspace="8" style="width:50%"
src="{{site.baseurl}}/img/blog/2022-05-23-latency-part2/async-io.png" />
+</center>
+
+**You can apply this optimization** if the client of your external system
supports asynchronous requests. If it does not, you can use a thread pool of
multiple clients to handle synchronous requests in parallel. You can also use a
cache to speed up lookups if the data in the external system is not changing
frequently. A cache, however, comes at the cost of working with outdated data.
+
+In this experiment, we simulated an external system that returns responses
within 1 to 6 ms randomly, and we keep the external system response in a cache
in our job for 1s. The results below show the comparison between two jobs:
[EnrichingJobSync](https://github.com/ververica/lab-flink-latency/blob/main/src/main/java/com/ververica/lablatency/job/EnrichingJobSync.java)
and
[EnrichingJobAsync](https://github.com/ververica/lab-flink-latency/blob/main/src/main/java/com/ververica/lablatency/j
[...]
+
+
+<center>
+<img vspace="8" style="width:50%"
src="{{site.baseurl}}/img/blog/2022-05-23-latency-part2/enriching-with-async-io.png"
/>
+</center>
+
+### Using a streaming join
+
+If you are enriching a stream of events with an external database where the
data changes frequently, and the changes can be converted to a data stream,
then you have another option to use [connected
streams]({{site.DOCS_BASE_URL}}flink-docs-stable/docs/dev/datastream/operators/overview/#datastreamdatastream-rarr-connectedstream)
and a
[CoProcessFunction]({{site.DOCS_BASE_URL}}flink-docs-stable/docs/dev/datastream/operators/process_function/#low-level-joins)
to do a streaming join. This [...]
+
+
+## Tune checkpointing
+
+There are two aspects in checkpointing that impact latency: checkpoint
alignment time as well as checkpoint frequency and duration in case of
end-to-end exactly-once with transactional sinks.
+
+### Reduce checkpoint alignment time
+
+During checkpoint alignment, operators block the event processing from the
channels where checkpoint barriers have been received in order to wait for the
checkpoint barriers from other channels. Longer alignment time will result in
higher latencies.
+
+There are different ways to reduce checkpoint alignment time:
+
+* Improve the throughput. Any improvement in throughput helps processing the
buffers sitting in front of a checkpoint barrier faster.
+* Scale up or scale out. This is the same as the technique of “allocate enough
resources” described in [part
one](https://flink.apache.org/2022/05/18/latency-part1.html). Increased
processing power helps reducing backpressure and checkpoint alignment time.
+* Use unaligned checkpointing. In this case, checkpoint barriers will not wait
until the data is processed but skip over and pass on to the next operator
immediately. Skipped-over data, however, has to be checkpointed as well in
order to be consistent. Flink can also be configured to automatically switch
over from aligned to unaligned checkpointing after a certain alignment time has
passed.
+* Buffer less data. You can reduce the buffered data size by tuning the number
of exclusive and floating buffers. With less data buffered in the network
stack, the checkpoint barrier can arrive at operators quicker. However,
reducing buffers has an adverse effect on throughput and is just mentioned here
for completeness. Flink 1.14 improves buffer handling by introducing a feature
called *buffer debloating*. Buffer debloating can dynamically adjust buffer
size based on the current throug [...]
+
+
+### Tune checkpoint duration and frequency
+
+If you are working with transactional sinks with exactly-once semantics, the
output events are committed to external systems (e.g., Kafka) *only* upon
checkpoint completion. In this case, tuning other options may not help if you
do not tune checkpointing. Instead, you need to have fast and more frequent
checkpointing.
+
+To have fast checkpointing, you need to reduce the checkpoint duration. To
achieve that, you can, for example, turn on rocksdb incremental checkpointing,
reduce the state stored in Flink, clean up state that is not needed anymore, do
not put cache into managed state, store only necessary fields in state,
optimize the serialization format, etc. You can also scale up or scale out,
same as the technique of “allocate enough resources” described in [part
one](https://flink.apache.org/2022/05/ [...]
+
+To have more frequent checkpointing, you can reduce the checkpoint interval,
the minimum pause between checkpoints, or use concurrent checkpoints. But keep
in mind that concurrent checkpoints introduce more runtime overhead.
+
+Another option is to not use exactly-once sinks but to switch to at-least-once
sinks. The result of this is that you may have (correct but) duplicated output
events, so this may require the downstream application that consumes the output
events of your jobs to perform deduplication additionally.
+
+
+## Process events on arrival
+In a stream processing pipeline, there often exists a delay between the time
an event is received and the time the event can be processed (e.g., after
having seen all events up to a certain point in event time). The amount of
delay may be significant for those pipelines with very low latency
requirements. For example, a fraud detection job usually requires a sub-second
level of latency. In this case, you could process events with
[ProcessFunction]({{site.DOCS_BASE_URL}}flink-docs-stable/ [...]
+
+**You can apply this optimization** if your job has a sub-second level latency
requirement (e.g., hundreds of milliseconds) and the reduced watermarking
interval still contributes a significant part of the latency.
+
+**Keep in mind** that this may change your job logic considerably since you
have to deal with out-of-order events by yourself.
+
+# Summary
+
+Following part one, this blog post presented a few more latency optimization
techniques with a focus on direct latency optimization. In the next part, we
will focus on techniques that optimize latency by increasing throughput. Stay
tuned!
+
diff --git a/img/blog/2022-05-23-latency-part2/async-io.png
b/img/blog/2022-05-23-latency-part2/async-io.png
new file mode 100644
index 000000000..b1d836133
Binary files /dev/null and b/img/blog/2022-05-23-latency-part2/async-io.png
differ
diff --git a/img/blog/2022-05-23-latency-part2/enriching-with-async-io.png
b/img/blog/2022-05-23-latency-part2/enriching-with-async-io.png
new file mode 100644
index 000000000..fa11bf98c
Binary files /dev/null and
b/img/blog/2022-05-23-latency-part2/enriching-with-async-io.png differ
diff --git a/img/blog/2022-05-23-latency-part2/spread-work.png
b/img/blog/2022-05-23-latency-part2/spread-work.png
new file mode 100644
index 000000000..e0c40c5d0
Binary files /dev/null and b/img/blog/2022-05-23-latency-part2/spread-work.png
differ