Hi Yuxin

Thanks for the proposal, big + 1 for this FLIP.



It is difficult for users to calculate the size of network memory. If the 
setting is too small, the task cannot be started. If the setting is too large, 
there may be a waste of resources. As far as possible, Flink framework can 
automatically set a reasonable value, but I have a small problem. network 
memory is not only related to the parallelism of the task, but also to the 
complexity of the task DAG. The more complex a DAG is, shuffle write and 
shuffle read require larger buffers. How can we determine how many RS and IG a 
DAG has?



Best
JasonLee


---- Replied Message ----
| From | Yuxin Tan<tanyuxinw...@gmail.com> |
| Date | 12/28/2022 18:29 |
| To | <dev@flink.apache.org> |
| Subject | Re: [DISCUSS] FLIP-266: Simplify network memory configurations for 
TaskManager |
Hi, Roman

Thanks for the replay.

ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
configurations, which are not calculated. I have described them in the FLIP
motivation section.

3. Each gate requires at least one buffer...
The timeout exception occurs when the ExclusiveBuffersPerChannel
can not be requested from NetworkBufferPool, which is not caused by the
change of this Flip. In addition, if  we have set the
ExclusiveBuffersPerChannel
to 0 when using floating buffers, which can also decrease the probability
of
this exception.

4. It would be great to have experimental results for jobs with different
exchange types.
Thanks for the suggestion. I have a test about different exchange types,
forward
and rescale, and the results show no differences from the all-to-all type,
which
is also understandable, because the network memory usage is calculated
with numChannels, independent of the edge type.

Best,
Yuxin


Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道:

Hi everyone,

Thanks for the proposal and the discussion.

I couldn't find much details on how exactly the values of
ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
I guess that
- the threshold evaluation is done on JM
- floating buffers calculation is done on TM based on the current memory
available; so it is not taking into account any future tasks submitted for
that (or other) job
Is that correct?

If so, I see the following potential issues:

1. Each (sub)task might have different values because the actual
available memory might be different. E.g. some tasks might use exclusive
buffers and others only floating. That could lead to significant skew
in processing speed, and in turn to issues with checkpoints and watermarks.

2. Re-deployment of a task (e.g. on job failure) might lead to a completely
different memory configuration. That, coupled with different values per
subtask and operator, makes the performance analysis more difficult.

(Regardless of whether it's done on TM or JM):
3. Each gate requires at least one buffer [1]. So, in case when no memory
is available, TM will throw an Allocation timeout exception instead of
Insufficient buffers exception immediately. A delay here (allocation
timeout) seems like a regression.
Besides that, the regression depends on how much memory is actually
available and how much it is contended, doesn't it?
Should there still be a lower threshold of available memory, below which
the job (task) isn't accepted?
4. The same threshold for all types of shuffles will likely result in using
exclusive buffers
for point-wise connections and floating buffers for all-to-all ones. I'm
not sure if that's always optimal. It would be great to have experimental
results for jobs with different exchange types, WDYT?

[1]
https://issues.apache.org/jira/browse/FLINK-24035

Regards,
Roman


On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <tanyuxinw...@gmail.com> wrote:

Hi, Weihua

Thanks for your suggestions.

1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
total buffer is not enough?

I think it's a good idea. Will try and check the results in PoC. Before
all
read buffers use floating buffers, I will try to use
(ExclusiveBuffersPerChannel - i)
buffers per channel first. For example, if the user has configured
ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
all channels is 1 and all read buffers are insufficient, all read buffers
will use floating buffers.
If the test results prove better, the FLIP will use this method.

2. Do we really need to change the default value of
'taskmanager.memory.network.max'?

Changing taskmanager.memory.network.max will indeed affect some
users, but the user only is affected when the 3 conditions are fulfilled.
1) Flink total TM memory is larger than 10g (because the network memory
ratio is 0.1).
2) taskmanager.memory.network.max was not initially configured.
3) Other memory, such as managed memory or heap memory, is insufficient.
I think the number of jobs fulfilling the conditions is small because
when
TM
uses such a large amount of memory, the network memory requirement may
also be large. And when encountering the issue, the rollback method is
very
simple,
configuring taskmanager.memory.network.max as 1g or other values.
In addition, the reason for modifying the default value is to simplify
the
network
configurations in most scenarios. This change does affect a few usage
scenarios,
but we should admit that setting the default to any value may not meet
the requirements of all scenarios.

Best,
Yuxin


Weihua Hu <huweihua....@gmail.com> 于2022年12月26日周一 20:35写道:

Hi Yuxin,
Thanks for the proposal.

"Insufficient number of network buffers" exceptions also bother us.
It's
too hard for users to figure out
how much network buffer they really need. It relates to partitioner
type,
parallelism, slots per taskmanager.

Since streaming jobs are our primary scenario, I have some questions
about
streaming jobs.

1. In this FLIP, all read buffers will use floating buffers when the
total
buffer is more than
'taskmanager.memory.network.read-required-buffer.max'. Competition in
buffer allocation led to preference regression.
How about reducing ExclusiveBuffersPerChannel to 1 first when the total
buffer is not enough?
Will this reduce performance regression in streaming?

2. Changing taskmanager.memory.network.max will affect user migration
from
the lower version.
IMO, network buffer size should not increase with total memory,
especially
for streaming jobs with application mode.
For example, some ETL jobs with rescale partitioner only require a few
network buffers.
And we already have
'taskmanager.memory.network.read-required-buffer.max'
to control maximum read network buffer usage.
Do we really need to change the default value of
'taskmanager.memory.network.max'?

Best,
Weihua


On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <tanyuxinw...@gmail.com>
wrote:

Hi, all
Thanks for the reply and feedback for everyone!


After combining everyone's comments, the main concerns, and
corresponding
adjustments are as follows.


@Guowei Ma, Thanks for your feedback.
should we introduce a _new_ non-orthogonal
option(`taskmanager.memory.network.required-buffer-per-gate.max`).
That
is
to say, the option will affect both streaming and batch shuffle
behavior
at
the
same time.

1. Because the default option can meet most requirements no matter in
Streaming
or Batch scenarios. We do not want users to adjust this default
config
option by
design. This configuration option is added only to preserve the
possibility
of
modification options for users.
2. In a few cases, if you really want to adjust this option, users
may
not
expect to
adjust the option according to Streaming or Batch, for example,
according
to the
parallelism of the job.
3. Regarding the performance of streaming shuffle, the same problem
of
insufficient memory also exists for Streaming jobs. We introduced
this
configuration
to enable users to decouple memory and parallelism, but it will
affect
some
performance. By default, the feature is disabled and does not affect
performance.
However, the added configuration enables users to choose to decouple
memory
usage and parallelism for Streaming jobs.

It's better not to expose more implementation-related concepts to
users.

Thanks for you suggestion. I will modify the option name to avoid
exposing
implementation-related concepts. I have changed it to
`taskmanager.memory.network.read-required-buffer.max` in the FLIP.



@Dong Lin, Thanks for your reply.
it might be helpful to add a dedicated public interface section to
describe
the config key and config semantics.

Thanks for your suggestion. I have added public interface section to
describe
the config key and config semantics clearly.

This FLIP seems to add more configs without removing any config
from
Flink.

This Flip is to reduce the number of options to be adjusted when
using
Flink.
After the Flip, the default option can meet the requirements in most
sceneries
rather than modifying any config
options(`taskmanager.network.memory.buffers-per-channel`
and `taskmanager.network.memory.floating-buffers-per-gate`), which is
helpful
to improve the out-of-box usability. In the long run, these two
parameters
`taskmanager.network.memory.buffers-per-channel` and
`taskmanager.network.memory.floating-buffers-per-gate` may indeed be
deprecated
to reduce user parameters, but from the perspective of compatibility,
we
need to
pay attention to users' feedback before deciding to deprecate the
options.



@Yanfei Lei,Thanks for your feedback.
1. Through the option is cluster level, the default value is
different
according to the
job type. In other words, by default, for Batch jobs, the config
value
is
enabled, 1000.
And for Streaming jobs, the config value is not enabled by default.

2. I think this is a good point. The total floating buffers will not
change
with



ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
because this is the maximum memory threshold. But if the user
explicitly
specified
the ExclusiveBuffersPerChannel, the calculated result of
ExclusiveBuffersPerChannel * numChannels will change with it.


Thanks again for all feedback!


Best,
Yuxin


Zhu Zhu <reed...@gmail.com> 于2022年12月26日周一 17:18写道:

Hi Yuxin,

Thanks for creating this FLIP.

It's good if Flink does not require users to set a very large
network
memory, or tune the advanced(hard-to-understand)
per-channel/per-gate
buffer configs, to avoid "Insufficient number of network buffers"
exceptions
which can easily happen for large scale jobs.

Regarding the new config
"taskmanager.memory.network.read-required-buffer.max",
I think it's still an advanced config which users may feel hard to
tune.
However, given that in most cases users will not need to set it, I
think it's acceptable.

So +1 for this FLIP.

In the future, I think Flink should adaptively select to use
exclusive
buffers
or not according to whether there are sufficient network buffers at
runtime.
Users then no longer need to understand the above configuration.
This
may
require supporting transitions between exclusive buffers and
floating
buffers.
A problem of all buffer floating is that too few network buffers
can
result
in task slowness which is hard to identify by users. So it's also
needed
to
do improvements on metrics and web UI to expose such issues.

Thanks,
Zhu

Yanfei Lei <fredia...@gmail.com> 于2022年12月26日周一 11:13写道:

Hi Yuxin,

Thanks for the proposal!

After reading the FLIP, I have some questions about the default
value.
This FLIP seems to introduce a *new* config
option(taskmanager.memory.network.required-buffer-per-gate.max)
to
control
the network memory usage.
1. Is this configuration at the job level or cluster level? As
the
FLIP
described, the default values of the Batch job and Stream job are
different, If an explicit value is set for cluster level, will it
affect
all Batch jobs and Stream jobs on the cluster?

2. The default value of Batch Job depends on the value of





ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
if the value of ExclusiveBuffersPerChannel changed, does
"taskmanager.memory.network.required-buffer-per-gate.max" need to
change
with it?


Best,
Yanfei

Dong Lin <lindon...@gmail.com> 于2022年12月25日周日 08:58写道:

Hi Yuxin,

Thanks for proposing the FLIP!

The motivation section makes sense. But it seems that the
proposed
change
section mixes the proposed config with the evaluation results.
It
is
a
bit
hard to understand what configs are proposed and how to
describe
these
configs to users. Given that the configuration setting is part
of
public
interfaces, it might be helpful to add a dedicated public
interface
section
to describe the config key and config semantics, as suggested
in
the
FLIP
template here
<





https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

.

This FLIP seems to add more configs without removing any config
from
Flink.
Intuitively this can make the Flink configuration harder rather
than
simpler. Maybe we can get a better idea after we add a public
interface
section to clarify those configs.

Thanks,
Dong


On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
tanyuxinw...@gmail.com>
wrote:

Hi, devs,

I'd like to start a discussion about FLIP-266: Simplify
network
memory
configurations for TaskManager[1].

When using Flink, users may encounter the following issues
that
affect
usability.
1. The job may fail with an "Insufficient number of network
buffers"
exception.
2. Flink network memory size adjustment is complex.
When encountering these issues, users can solve some problems
by
adding
or
adjusting parameters. However, multiple memory config options
should
be
changed. The config option adjustment requires understanding
the
detailed
internal implementation, which is impractical for most users.

To simplify network memory configurations for TaskManager and
improve
Flink
usability, this FLIP proposed some optimization solutions for
the
issues.

Looking forward to your feedback.

[1]







https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager

Best regards,
Yuxin







Reply via email to