[
https://issues.apache.org/jira/browse/FLINK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521539#comment-17521539
]
Piotr Nowojski edited comment on FLINK-24578 at 4/13/22 8:28 AM:
-----------------------------------------------------------------
I've spent a couple of days thinking about this problem, while running a simple
benchmark with artificial data source, 5 keyBy's, without any data skew, and
those are my thoughts:
A plausible explanation behind this ~10% performance drop when enabling buffer
debloating.
Couple of facts:
# With buffer debloating ("w bd") , randomly, some throttling sink subtasks are
idling and wasting cycles (slow subtasks), while others are 100% busy (fast
subtasks), which happens less frequently without buffer debloating ("w/o bd")
# slow subtasks and fast subtasks stay in that state for the duration of
running job.
# There is a correlation on some metrics between slow and fast subtasks.
a) Slow subtasks have slightly higher numRecordsIn - it stays a couple tens of
thousands ahead of the faster subtasks. The difference between slow and fast
subtasks in numRecordsIn stays more or less constant for the whole job running
duration.
b) Slow subtasks have fewer input enqueued buffers/buffers in use, for example
~40, vs ~160 for fast subtasks
c) slow subtasks have 80-140ms idle time, fast have most of the time 0ms idle
time
d) debloated buffer size for slow subatsks is 32kB, while fast subtasks have
~6kB (as a result of very similar throughput, but different buffersInUse count)
e) slow subtasks have quite small value of inputQueueSize metric, almost zero,
so the input buffers are almost/mostly empty
What I think is happening is as follows:
* during startup, some subtasks are starting up a bit sooner then others. If we
limit our selfs to parallelism 2, until both sink subtasks are running, the job
will be fully backpressured. Once first sink boots up, it will consume all of
the buffered in-flight data addressed to it, however job will be still fully
backpressured, because upstream subtasks will be backpressured on the
channels/subpartitions that are waiting for the second sink to boot up
* this I think is causing this initial differentiation between slow and fast
subtasks. “slow” are those that actually boot up first, “fast” are those that
boot up last. “slow” subtasks will be slightly ahead of “fast” subtasks in the
consumption of records, hence the difference in “numRecordsIn” metric.
once “fast” subtasks finally boot up, they have tons of buffered in-flight data
to consume on all input channels, while “slow” subtasks have mostly empty input
buffers. This results in buffer debloating mechanism to assign larger desired
buffer sizes for “slow” subtasks, while “fast” will have smaller buffer sizes.
* this state is stable and is perpetually kept, because now look at the
upstream subtasks. It is still backpressured on those subpartitions leading to
“fast” sinks. Sinks are perpetuating this state, because those subpartitions on
which upstream subtasks are backpressured will be forced to limit their buffer
size, while non backpressured subpartitions will be granted even larger buffer
sizes, basically that subpartitions with larger buffer sizes will not be
backpressured ever.
However all in all, I think this is not The Problem.
Now think about a scenario where we have perfect records distribution. I think
we would end up in a state, that all subpartitions are consuming records
equally quickly, have exactly the same numRecordsInRate, but “slow” subtasks
are slightly ahead of “fast” subtasks. Since slow subpartitions have smaller
buffer sizes, they can fit fewer records, they will be always causing the
backpressure. But with perfect records distribution I think this wouldn’t
result with wasted resources on the downstream nodes.
The Problem happens I think because of some slight random intermittent
hiccups/data skews. If anything like that happens for the “slow”/“large”
subpartitions, it’s unlikely that it affects the job as a whole, since upstream
subtasks have some available space in the “slow” subpartitions to smooth out
those hiccups/skews (remember “slow” subpartitions end up with larger buffer
sizes). However if hiccup/data skew happens for a brief moment on the “fast”
subtask, this will result in backpressuring and stalling upstreaming subtask,
which in turns cause a stall and idle time on the “fast” sinks, since they
don’t have much (if any) buffered data to smooth out the hiccups.
What’s worse, is that it might be not just the “buffer debloating” problem. It
looks like just a problem of being able or not to smooth out
hiccups/intermittent data skew. I have seen exactly the same throughput with
disabled buffer debloating, but with manually reduced buffer size to 8kB, as
with enabled buffer debloating and “fast” subpartitions that have ~6kB buffer
size.
was (Author: pnowojski):
I've spent a couple of days thinking about this problem, while running a simple
benchmark with artificial data source, 5 keyBy's, without any data skew, and
those are my thoughts:
A plausible explanation behind this ~10% performance drop when enabling buffer
debloating.
Couple of facts:
# With buffer debloating ("w bd") , randomly, some throttling sink subtasks are
idling and wasting cycles (slow subtasks), while others are 100% busy (fast
subtasks), which happens less frequently without buffer debloating ("w/o bd")
# slow subtasks and fast subtasks stay in that state for the duration of
running job.
# There is a correlation on some metrics between slow and fast subtasks.
a) Slow subtasks have slightly higher numRecordsIn - it stays a couple tens of
thousands ahead of the faster subtasks. The difference between slow and fast
subtasks in numRecordsIn stays more or less constant for the whole job running
duration.
b) Slow subtasks have fewer input enqueued buffers/buffers in use, for example
~40, vs ~160 for fast subtasks
c) slow subtasks have 80-140ms idle time, fast have most of the time 0ms idle
time
d) debloated buffer size for slow subatsks is 32kB, while fast subtasks have
~6kB (as a result of very similar throughput, but different buffersInUse count)
e) slow subtasks have quite small value of inputQueueSize metric, almost zero,
so the input buffers are almost/mostly empty
What I think is happening is as follows:
* during startup, some subtasks are starting up a bit sooner then others. If we
limit our selfs to parallelism 2, until both sink subtasks are running, the job
will be fully backpressured. Once first sink boots up, it will consume all of
the buffered in-flight data addressed to it, however job will be still fully
backpressured, because upstream subtasks will be backpressured on the
channels/subpartitions that are waiting for the second sink to boot up
* this I think is causing this initial differentiation between slow and fast
subtasks. “slow” are those that actually boot up first, “fast” are those that
boot up last. “slow” subtasks will be slightly ahead of “fast” subtasks in the
consumption of records, hence the difference in “numRecordsIn” metric.
once “fast” subtasks finally boot up, they have tons of buffered in-flight data
to consume on all input channels, while “slow” subtasks have mostly empty input
buffers. This results in buffer debloating mechanism to assign larger desired
buffer sizes for “slow” subtasks, while “fast” will have smaller buffer sizes.
* this state is stable and is perpetually kept, because now look at the
upstream subtasks. It is still backpressured on those subpartitions leading to
“fast” sinks. Sinks are perpetuating this state, because those subpartitions on
which upstream subtasks are backpressured will be forced to limit their buffer
size, while non backpressured subpartitions will be granted even larger buffer
sizes, basically that subpartitions with larger buffer sizes will not be
backpressured ever.
However all in all, I think this is not The Problem.
Now think about a scenario where we have perfect records distribution. I think
we would end up in a state, that all subpartitions are consuming records
equally quickly, have exactly the same numRecordsInRate, but “slow” subtasks
are slightly ahead of “fast” subtasks. Since slow subpartitions have smaller
buffer sizes, they can fit fewer records, they will be always causing the
backpressure. But with perfect records distribution I think this wouldn’t
result with wasted resources on the downstream nodes.
The Problem happens I think because of some slight random intermittent
hiccups/data skews. If anything like that happens for the “slow”/“large”
subpartitions, it’s unlikely that it affects the job as a whole, since upstream
subtasks have some available space in the “slow” subpartitions to smooth out
those hiccups/skews (remember “slow” subpartitions end up with larger buffer
sizes). However if hiccup/data skew happens for a brief moment on the “fast”
subtask, this will result in backpressuring and stalling upstreaming subtask,
which in turns cause a stall and idle time on the “fast” sinks, since they
don’t have much (if any) buffered data to smooth out the hiccups.
> Unexpected erratic load shape for channel skew load profile and ~10%
> performance loss with enabled debloating
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-24578
> URL: https://issues.apache.org/jira/browse/FLINK-24578
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.0
> Reporter: Anton Kalashnikov
> Priority: Major
> Attachments: antiphaseBufferSize.png, erraticBufferSize1.png,
> erraticBufferSize2.png
>
>
> given:
> The job with 5 maps(with keyBy).
> All channels are remote. Parallelism is 80
> The first task produces only two keys - `indexOfThisSubtask` and
> `indexOfThisSubtask + 1`. So every subTask has a constant value of active
> channels(depends on hash rebalance)
> Every record has an equal size and is processed for an equal time.
>
> when:
> The buffer debloat is enabled with the default configuration.
>
> then:
> The buffer size synchonizes on every subTask on the first map for some
> reason. It can have the strong synchronization as shown on the
> erraticBufferSize1 picture but usually synchronization is less explicit as on
> erraticBufferSize2.
> !erraticBufferSize1.png!
> !erraticBufferSize2.png!
>
> Expected:
> After the stabilization period the buffer size should be mostly constant with
> small fluctuation or the different tasks should be in antiphase to each
> other(when one subtask has small buffer size the another should have a big
> buffer size). for example the picture antiphaseBufferSize
> !antiphaseBufferSize.png!
>
> Unfortunatelly, it is not reproduced every time which means that this problem
> can be connected to environment. But at least, it makes sense to try to
> understand why we have so strange load shape when only several input channels
> are active.
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)