infoverload commented on a change in pull request #454:
URL: https://github.com/apache/flink-web/pull/454#discussion_r663693379



##########
File path: _posts/2021-07-07-backpressure.md
##########
@@ -0,0 +1,189 @@
+---
+layout: post
+title:  "How to identify the source of backpressure?"
+date: 2021-07-07T00:00:00.000Z
+authors:
+- pnowojski:
+  name: "Piotr Nowojski"
+  twitter: "PiotrNowojski"
+excerpt: Apache Flink 1.13 introduced a couple of important changes in the 
area of backpressure monitoring and performance analysing of Flink Jobs. This 
blog post aims to introduce those changes and explain how to use them.

Review comment:
       ```suggestion
   excerpt: Apache Flink 1.13 introduced a couple of important changes in the 
area of backpressure monitoring and performance analysis of Flink Jobs. This 
blog post aims to introduce those changes and explain how to use them.
   ```

##########
File path: _posts/2021-07-07-backpressure.md
##########
@@ -0,0 +1,189 @@
+---
+layout: post
+title:  "How to identify the source of backpressure?"
+date: 2021-07-07T00:00:00.000Z
+authors:
+- pnowojski:
+  name: "Piotr Nowojski"
+  twitter: "PiotrNowojski"
+excerpt: Apache Flink 1.13 introduced a couple of important changes in the 
area of backpressure monitoring and performance analysing of Flink Jobs. This 
blog post aims to introduce those changes and explain how to use them.
+---
+
+{% toc %}
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl }}/img/blog/2021-07-07-backpressure/animated.png" 
alt="Backpressure monitoring in the web UI"/>
+       <p class="align-center">Backpressure monitoring in the web UI</p>
+</div>
+
+The backpressure topic was tackled from different angles over the last couple 
of years. However, when it comes 
+to identifying and analyzing sources of backpressure, things have changed 
quite a bit in the recent Flink releases 
+(especially with new additions to metrics and the web UI in Flink 1.13). This 
post will try to clarify some of 
+these changes and go into more detail about how to track down the source of 
backpressure, but first...
+
+## What is backpressure?
+
+This has been explained very well in an old, but still accurate, [post by Ufuk 
Celebi](https://www.ververica.com/blog/how-flink-handles-backpressure).
+I highly recommend reading it if you are not familiar with this concept. For a 
much deeper and low-level understanding of
+the topic and how Flink’s network stack works, there is a more [advanced 
explanation available 
here](https://alibabacloud.com/blog/analysis-of-network-flow-control-and-back-pressure-flink-advanced-tutorials_596632).
+
+At a high level, backpressure happens if some operator(s) in the Job Graph 
cannot process records at the
+same rate as they are received. This fills up the input buffers of the subtask 
that is running this slow operator.
+Once the input buffers are full, backpressure propagates to the output buffers 
of the upstream subtasks.
+Once those are filled up, the upstream subtasks are also forced to slow down 
their records’ processing
+rate to match the processing rate of the operator causing this bottleneck down 
the stream. Backpressure
+further propagates up the stream until it reaches the source operators.
+
+As long as the load and available resources are static and none of the 
operators produce short bursts of
+data (like windowing operators), those input/output buffers should only be in 
one of two states: almost empty
+or almost full. If the downstream operator or subtask is able to keep up with 
the influx of data, the
+buffers will be empty. If not, then the buffers will be full [<sup>1</sup>]. 
In fact, checking the buffers’ usage metrics
+was the basis of the previously recommended way on how to detect and analyze 
backpressure described [a couple
+of years back by Nico 
Kruber](https://flink.apache.org/2019/07/23/flink-network-stack-2.html#backpressure).
+As I mentioned in the beginning, Flink now offers much better tools to do the 
same job, but before we get to that,
+there are two questions worth asking.  
+
+### Why should I care about backpressure?
+
+Backpressure is an indicator that your machines or operators are overloaded. 
The buildup of backpressure
+directly affects the end-to-end latency of the system, as records are waiting 
longer in the queues before
+being processed. Secondly, aligned checkpointing takes longer with 
backpressure, while unaligned checkpoints
+will be larger (you can read more about aligned and unaligned checkpoints [in 
the 
documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/stateful-stream-processing/#checkpointing).
+If you are struggling with checkpoint barriers propagation times, taking care 
of backpressure would most
+likely help to solve the problem. Lastly, you might just want to optimize your 
job in order to reduce
+the costs of running the job.
+
+In order to address the problem for all cases, one needs to be aware of it, 
then locate and analyze it. 
+
+### Why shouldn’t I care about backpressure?
+
+Frankly, you do not  always have to care about the presence of backpressure. 
Almost by definition, lack
+of backpressure means that your cluster is at least ever so slightly 
underutilized and over-provisioned.
+If you want to minimize idling resources, you probably can not avoid incurring 
some backpressure. This
+is especially true for batch processing.
+
+## How to detect and track down the source of backpressure?
+
+One way to detect backpressure is to use 
[metrics](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#system-metrics),
+however, in Flink 1.13 it’s no longer necessary to dig so deep. In most cases, 
it should be enough to just
+look at the job graph in the Web UI.
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl 
}}/img/blog/2021-07-07-backpressure/simple-example.png"/>
+</div>
+
+The first thing to note in the example above is that different tasks have 
different colors. Those colors
+represent a combination of two factors: under how much backpressure this task 
is and how busy it is. Idling
+tasks will be blue, fully busy tasks will be red hot, and fully backpressured 
tasks will be black. Anything
+in between will be a combination/shade of those three colors. With this 
knowledge, one can easily spot the
+backpressured tasks (black). The busiest (red) task downstream of the 
backpressured tasks will most likely
+be the source of the backpressure (the bottleneck).
+
+If you click on one particular task and go into the “BackPressure” tab you 
will be able to further dissect
+the problem and check what is the busy/backpressured/idle status of every 
subtask in that task. For example,
+this is especially handy if there is a data skew and not all subtasks are 
equally utilized.
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl }}/img/blog/2021-07-07-backpressure/subtasks.png" 
alt="Backpressure among subtasks"/>
+       <p class="align-center">Backpressure among subtasks</p>
+</div>
+
+In the above example, we can clearly see which subtasks are idling, which are 
backpressured, and that
+none of them are busy. And frankly, in a nutshell, that should be enough to 
quickly understand what is
+happening with your Job :) However, there are a couple of more details worth 
explaining.
+
+### What are those numbers?
+
+If you are curious how it works underneath, we can go a little deeper. At the 
base of this new mechanism
+we have three [new 
metrics](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#io)
+that are exposed and calculated by each subtask:
+- `idleTimeMsPerSecond`
+- `busyTimeMsPerSecond`
+- `backPressuredTimeMsPerSecond`
+Each of them measures the average time in milliseconds per second that the 
subtask spent being idle,
+busy, or backpressured respectively. Apart from some rounding errors they 
should complement each other and
+add up to 1000ms/s. In essence, they are quite similar to, for example, CPU 
usage metrics.
+
+Another important detail is that they are being averaged over a short period 
of time (a couple of seconds)
+and they take into account everything that is happening inside the subtask’s 
thread: operators, functions,
+timers, checkpointing, records serialization/deserialization, network stack, 
and other Flink internal
+overheads. A `WindowOperator` that is busy firing timers and producing results 
will be reported as busy or backpressured.
+A function doing some expensive computation in 
`CheckpointedFunction#snapshotState` call, for instance flushing
+internal buffers, will also be reported as busy. 
+
+One limitation, however, is that `busyTimeMsPerSecond` and 
`idleTimeMsPerSecond` metrics are oblivious
+to anything that is happening in separate threads, outside of the main 
subtask’s execution loop.
+Fortunately, this is only relevant for two cases:
+- Custom threads that you manually spawn in your operators (a discouraged 
practice).
+- Old-style sources that implement the deprecated `SourceFunction` interface. 
Such sources will report `NaN`/`N/A`
+as the value for busyTimeMsPerSecond. For more information on the topic of 
Data Sources please
+[take a look 
here](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/sources/).
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl 
}}/img/blog/2021-07-07-backpressure/source-task-busy.png" alt="Old-style 
sources do not report busy time"/>
+       <p class="align-center">Old-style sources do not report busy time</p>
+</div>
+
+In order to present those raw numbers in the web UI, those metrics need to be 
aggregated from all subtasks
+(on the job graph we are showing only tasks). This is why the web UI presents 
the maximal value from all
+subtasks of a given task and why the aggregated maximal values of busy and 
backpressured may not add up to 100%.
+One subtask can be backpressured at 60%, while another can be busy at 60%.  
This can result in a task that
+is both backpressured and busy at 60%.
+
+### Varying load
+
+There is one more thing. Do you remember that those metrics are measured and 
averaged over a couple of seconds?
+Keep this in mind when analyzing jobs or tasks with varying load, such as 
(sub)tasks containing a `WindowOperator`
+that is firing periodically. Both the subtask with a constant load of 50% and 
the subtask that alternates every
+second between being fully busy and fully idle will be reporting the same 
value of `busyTimeMsPerSecond`
+of 500ms/s.
+
+Furthermore, varying load and especially firing windows can move the 
bottleneck to a different place in
+the job graph:
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl 
}}/img/blog/2021-07-07-backpressure/bottleneck-zoom.png" alt="Bottleneck 
alternating between two tasks"/>
+       <p class="align-center">Bottleneck alternating between two tasks</p>
+</div>
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl 
}}/img/blog/2021-07-07-backpressure/sliding-window.png" 
alt="SlidingWindowOperator"/>
+       <p class="align-center">SlidingWindowOperator</p>
+</div>
+
+In this particular example, `SlidingWindowOperator` was the bottleneck as long 
as it was accumulating records.
+However, as soon as it starts to fire its windows (once every 10 seconds), the 
downstream task
+`SlidingWindowCheckMapper -> Sink: SlidingWindowCheckPrintSink` becomes the 
bottleneck and `SlidingWindowOperator`
+gets backpressured. As those busy/backpressured/idle metrics are averaging 
time over a couple of seconds,
+this subtlety is not immediately visible and has to be read between the lines. 
On top of that, the web UI
+is updating its state only once every 10 seconds, which makes spotting more 
frequent changes a bit more difficult.
+
+## What can I do with backpressure?
+
+In general this is a complex topic that is worthy of a dedicated blog post. It 
was, to a certain extent,
+addressed in [previous blog 
posts](https://flink.apache.org/2019/07/23/flink-network-stack-2.html#:~:text=this%20is%20unnecessary.-,What%20to%20do%20with%20Backpressure%3F,-Assuming%20that%20you).
+In short, there are two high-level ways of dealing with backpressure. Either 
add more resources (more machines,
+faster CPU, more RAM, better network, using SSDs…) or optimize usage of the 
resources you already have
+(optimize the code, tune the configuration, avoid data skew). In either case, 
you first need to analyze
+what is causing backpressure by:
+1. Identifying the presence of backpressure.
+2. Locating which subtask(s) or machines are causing it.
+3. Digging deeper into what part of the code is causing it and which resource 
is scarce.
+
+Backpressure monitoring improvements and metrics can help you with the first 
two points. To tackle the
+last one, profiling the code can be the way to go. To help with profiling, 
also starting from Flink 1.13,
+[Flame Graphs](http://www.brendangregg.com/flamegraphs.html) are [integrated 
into Flink's web 
UI](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/).
+Flame Graphs is a well known profiling tool and visualization technique and I 
encourage you to give it a try. 
+
+But keep in mind that after locating where the bottleneck is, you can analyze 
it the same way you would
+any other non-distributed application (by checking resource utilization, 
attaching a profiler, etc).
+Usually there is no silver bullet for problems like this. You can try to scale 
up but sometimes it might
+not be easy or practical to do.
+
+Anyway... The aforementioned improvements to backpressure monitoring allow us 
to easily detect the source of backpressure,

Review comment:
       The aforementioned improvements to backpressure monitoring allow us to 
easily detect the source of backpressure,
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to