..  Maybe we should add a way to register those threads such that they are
also sampled.  Thoughts?

On Thu, Jan 3, 2019 at 10:25 AM Jamie Grier <jgr...@lyft.com> wrote:

> One unfortunate problem with the current back-pressure detection mechanism
> is that it doesn't work well with all of our sources.  The problem is that
> some sources (Kinesis for sure) emit elements from threads Flink knows
> nothing about and therefore those stack traces aren't sampled.  The result
> is that you never see back-pressure detected in the first chain of a Flink
> job containing that source.
>
> On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski <pi...@da-platform.com>
> wrote:
>
>> Hi all,
>>
>> peiliping: I think your idea could be problematic for couple of reasons.
>> Probably minor concern is that checkpoint time could be affected not only
>> because of the back pressure, but also because how long does it take to
>> actually perform the checkpoint. Bigger issues are that this bottleneck
>> detection would be limited to only during checkpointing (what if one has
>> checkpoints only once every 1 hour? Or none at all?) AND
>> performance/bottlenecks may change significantly during checkpointing (for
>> example writing state for the first operator to DFS can affect indirectly
>> down stream operators).
>>
>> The idea of detecting back pressure/bottlenecks using output/input
>> buffers is much more natural. Because in the end, almost by definition, if
>> the output buffers are full, that means that the given task is back
>> pressured.
>>
>> Both input and output queues length are already exposed via metrics, so
>> developers have an access to raw data to manually calculate/detect
>> bottlenecks. It would be actually nice to automatically aggregate those
>> metrics and provide ready to use metrics: boolean flags whether
>> task/stage/job are back pressured or not.
>>
>> Replacing current back pressure detection mechanism that probes the
>> threads and checks which of them are waiting for buffers is another issues.
>> Functionally it is equivalent to monitoring whether the output queues are
>> full. This might be more hacky, but will give the same results, thus it
>> wasn’t high on my priority list to change/refactor. It would be nice to
>> clean this up a little bit and unify, but using metrics can also mean some
>> additional work, since there are some known metrics related performance
>> issues.
>>
>> Piotrek
>>
>> > On 3 Jan 2019, at 10:35, peiliping <peiliping...@gmail.com> wrote:
>> >
>> > I have some ideas about detecting the backpressure (the blocking
>> operators)  by checkpoint barrier .
>> >
>> > I have some flink-jobs with checkpoint , but their checkpoints will
>> take a long time to be completed .
>> >
>> > I need to find out the blocking operators  , the same as the
>> backpressure detection .
>> >
>> > In a checkpoint object , I can get a timestamp which means the
>> start-time , then I compute a metric in
>> >
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
>> >
>> > The metric  is  a delta time between checkpoint.timestamp to the time
>> when StreamTask.executeCheckpointing invoke
>> >
>> > and I named it as checkpoint-delay-time .
>> >
>> > It looks like the end-to-end-time metric in checkpoint  but not include
>> async-handles  ,
>> >
>> > For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
>> ---> C (parallelism : 1)
>> >
>> > Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
>> A(there are 2 instances )
>> >
>> > Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
>> B(there are 3 instances )
>> >
>> > Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
>> C(there is 1 instance)
>> >
>> > Then I can get the other 3 delta time from checkpoint-delay-values
>> >
>> > result-0-->A  = Checkpoint-delay-value-A  -  0
>> >
>> > result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
>> >
>> > result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
>> >
>> > someone ( result-X-->Y)  which is longer than 5s (maybe other
>> threshold)  should be the black sheep .
>> >
>> >
>> >
>> >
>> >
>> > 在 2019/1/3 下午2:43, Yun Gao :
>> >> Hello liping,
>> >>
>> >>        Thank you for proposing to optimize the backpressure detection!
>> From our previous experience, we think the InputBufferPoolUsageGauge and
>> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a
>> list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage
>> of task A and InputBufferPoolUsage of task B is 100%, but the
>> OutputBufferPoolUsage of task B is less than 100%, then it should be the
>> task B that causes the backpressure.
>> >>
>> >>       However, currently we think that the InputBufferPoolUsage and
>> OutputBufferPoolUsage requires some modification to be more accurate:
>> >>            1. When there are multiple inputs or outputs, the
>> InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum
>> usage instead of the average usage [1].
>> >>          2. Currently the sender side will report backlog right before
>> fulfilling the output Buffer. Together with the pre-allocate logic in the
>> receiver side, the InputBufferPoolUsage may be 100% even if the data have
>> not been received yet [2].
>> >>
>> >>      We may need to address these problems before adopting the
>> InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure
>> indicator.
>> >>
>> >>      Besides, another similar thought is that we may also add new
>> InputBufferUsage and OutputBufferUsage metrics to show (number of queued
>> buffers / number of all buffers) instead.
>> >>
>> >>
>> >>     Best,
>> >>    Yun Gao
>> >>
>> >>
>> >>     [1] https://issues.apache.org/jira/browse/FLINK-10981
>> >>     [2] https://issues.apache.org/jira/browse/FLINK-11082
>> >>
>> >>
>> >> ------------------------------------------------------------------
>> >> From:裴立平 <peiliping...@gmail.com>
>> >> Send Time:2019 Jan. 3 (Thu.) 13:39
>> >> To:dev <dev@flink.apache.org>
>> >> Subject:[DISCUSS] Detection Flink Backpressure
>> >>
>> >> Recently I want to optimize the way to find the positions where the
>> >> backpressures occured .
>> >>
>> >> I read some blogs about flink-backpressure and have a rough idea of it
>> .
>> >>
>> >> The method which Flink adopted is thread-stack-sample ,  it's heavy and
>> >> no-lasting .
>> >>
>> >> The positions where backpressures occured are very important to the
>> >> developers .
>> >>
>> >> They should be treated as monitor-metrics .
>> >>
>> >> Any other choice that we can take to detection the flink backpressures
>> ?
>> >>
>> >
>>
>>

Reply via email to