Thanks for your question, Roman

> wouldn't it be more clear to separate motivation from the
proposed changes?
All the contents in the motivation part are the current implementation
of Shuffle reading.
The Flip has described where ExclusiveBuffersPerChannel
and FloatingBuffersPerGate is from, the description is as follows.
    "ExclusiveBuffersPerChannel is determined by
    taskmanager.network.memory.buffers-per-channel, which is 2 by default.
    FloatingBuffersPerGate ranges from 1 to DefaultFloatingBuffersPerGate.
    DefaultFloatingBuffersPerGate is determined by
    taskmanager.network.memory.floating-buffers-per-gate, which is 8 by
default."
This part is the current implementation of network memory, not the change
introduced
from the Flip.
ExclusiveBuffersPerChannel is used in
SingleInputGateFactory#createInputChannel,
so it is the number of exclusive buffers in one channel. In one gate, the
floating
buffers FloatingBuffersPerGate are used in SingleInputGateFactory#create to
create the LocalBufferPool of the gate. The buffer number of
LocalBufferPool ranges
from 1 to the DefaultFloatingBuffersPerGate, which can be seen in
SingleInputGateFactory#createBufferPoolFactory, the
NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate controls the buffer
number of LocalBufferPool ranges from 1 to DefaultFloatingBuffersPerGate.
One gate
may contain multiple channels, we call the number of channels as
numChannels, So
the total buffers are
*ExclusiveBuffersPerChannel * numChannels + FloatingBuffersPerGate,*where
FloatingBuffersPerGate in LocalBufferPool is a range from 1 to
DefaultFloatingBuffersPerGate, so
    "the range of TotalBuffersPerGate for each InputGate is
[ExclusiveBuffersPerChannel * numChannels
     + 1, ExclusiveBuffersPerChannel * numChannels +
DefaultFloatingBuffersPerGate]",
which is described in the Flip. All the above descriptions are current
implementations, not
changes from Flip. Therefore, the motivation section describes the current
implementation,
while the proposed change section describes the changes that have occurred,
which
separates motivation from the proposed change.

> So my question is what value exactly in this range will it have and how
and
where will it be computed?
Based on the current implementation, a threshold is introduced,
taskmanager.memory.network.read-required-buffer.max. As described above,
when
the the num of exclusive buffers in a gate (ExclusiveBuffersPerChannel *
numChannels)
is greater than the threshold, all read buffers will use floating buffers,
and in order to
keep the number of buffers consistent with that before the change, we will
modify the
creating process of LocalBufferPool.
This is an implementation detail and the change is as follows. The floating
buffers
in LocalBufferPool will be in the range of [numFloatingBufferThreashold,
ExclusiveBuffersPerChannel * numChannels + DefaultFloatingBuffersPerGate],
which means when calling SingleInputGateFactory#createBufferPoolFactory,
the Min buffers of the LocalBufferPool are numFloatingBufferThreashold, the
Max
buffers of the LocalBufferPool are ExclusiveBuffersPerChannel * numChannels
+
DefaultFloatingBuffersPerGate. The exact buffer num depends on whether
buffers
in NetworkBufferPool are sufficient when using LocalBufferPool, which can
be seen
in the current implementation of LocalBufferPool. And the exact number of
floating
buffers in LocalBufferPool is in the above range.

Best,
Yuxin


Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 20:10写道:

> Thanks for your reply Yuxin,
>
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> > configurations, which are not calculated. I have described them in the
> FLIP
> > motivation section.
>
> The motivation section says about floating buffers:
> > FloatingBuffersPerGate is within the range of
> [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels +
> DefaultFloatingBuffersPerGate] ...
> So my question is what value exactly in this range will it have and how and
> where will it be computed?
>
> As for the ExclusiveBuffersPerChannel, there was a proposal in the thread
> to calculate it dynamically (by linear search
> from taskmanager.network.memory.buffers-per-channel down to 0).
>
> Also, if the two configuration options are still in use, why does the FLIP
> propose to deprecate them?
>
> Besides that, wouldn't it be more clear to separate motivation from the
> proposed changes?
>
> Regards,
> Roman
>
>
> On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17610775...@163.com> wrote:
>
> > Hi Yuxin
> >
> >
> > Thanks for the proposal, big + 1 for this FLIP.
> >
> >
> >
> > It is difficult for users to calculate the size of network memory. If the
> > setting is too small, the task cannot be started. If the setting is too
> > large, there may be a waste of resources. As far as possible, Flink
> > framework can automatically set a reasonable value, but I have a small
> > problem. network memory is not only related to the parallelism of the
> task,
> > but also to the complexity of the task DAG. The more complex a DAG is,
> > shuffle write and shuffle read require larger buffers. How can we
> determine
> > how many RS and IG a DAG has?
> >
> >
> >
> > Best
> > JasonLee
> >
> >
> > ---- Replied Message ----
> > | From | Yuxin Tan<tanyuxinw...@gmail.com> |
> > | Date | 12/28/2022 18:29 |
> > | To | <dev@flink.apache.org> |
> > | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory
> configurations
> > for TaskManager |
> > Hi, Roman
> >
> > Thanks for the replay.
> >
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> > configurations, which are not calculated. I have described them in the
> FLIP
> > motivation section.
> >
> > 3. Each gate requires at least one buffer...
> > The timeout exception occurs when the ExclusiveBuffersPerChannel
> > can not be requested from NetworkBufferPool, which is not caused by the
> > change of this Flip. In addition, if  we have set the
> > ExclusiveBuffersPerChannel
> > to 0 when using floating buffers, which can also decrease the probability
> > of
> > this exception.
> >
> > 4. It would be great to have experimental results for jobs with different
> > exchange types.
> > Thanks for the suggestion. I have a test about different exchange types,
> > forward
> > and rescale, and the results show no differences from the all-to-all
> type,
> > which
> > is also understandable, because the network memory usage is calculated
> > with numChannels, independent of the edge type.
> >
> > Best,
> > Yuxin
> >
> >
> > Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道:
> >
> > Hi everyone,
> >
> > Thanks for the proposal and the discussion.
> >
> > I couldn't find much details on how exactly the values of
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
> > I guess that
> > - the threshold evaluation is done on JM
> > - floating buffers calculation is done on TM based on the current memory
> > available; so it is not taking into account any future tasks submitted
> for
> > that (or other) job
> > Is that correct?
> >
> > If so, I see the following potential issues:
> >
> > 1. Each (sub)task might have different values because the actual
> > available memory might be different. E.g. some tasks might use exclusive
> > buffers and others only floating. That could lead to significant skew
> > in processing speed, and in turn to issues with checkpoints and
> watermarks.
> >
> > 2. Re-deployment of a task (e.g. on job failure) might lead to a
> completely
> > different memory configuration. That, coupled with different values per
> > subtask and operator, makes the performance analysis more difficult.
> >
> > (Regardless of whether it's done on TM or JM):
> > 3. Each gate requires at least one buffer [1]. So, in case when no memory
> > is available, TM will throw an Allocation timeout exception instead of
> > Insufficient buffers exception immediately. A delay here (allocation
> > timeout) seems like a regression.
> > Besides that, the regression depends on how much memory is actually
> > available and how much it is contended, doesn't it?
> > Should there still be a lower threshold of available memory, below which
> > the job (task) isn't accepted?
> > 4. The same threshold for all types of shuffles will likely result in
> using
> > exclusive buffers
> > for point-wise connections and floating buffers for all-to-all ones. I'm
> > not sure if that's always optimal. It would be great to have experimental
> > results for jobs with different exchange types, WDYT?
> >
> > [1]
> > https://issues.apache.org/jira/browse/FLINK-24035
> >
> > Regards,
> > Roman
> >
> >
> > On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <tanyuxinw...@gmail.com>
> wrote:
> >
> > Hi, Weihua
> >
> > Thanks for your suggestions.
> >
> > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
> > total buffer is not enough?
> >
> > I think it's a good idea. Will try and check the results in PoC. Before
> > all
> > read buffers use floating buffers, I will try to use
> > (ExclusiveBuffersPerChannel - i)
> > buffers per channel first. For example, if the user has configured
> > ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
> > are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
> > all channels is 1 and all read buffers are insufficient, all read buffers
> > will use floating buffers.
> > If the test results prove better, the FLIP will use this method.
> >
> > 2. Do we really need to change the default value of
> > 'taskmanager.memory.network.max'?
> >
> > Changing taskmanager.memory.network.max will indeed affect some
> > users, but the user only is affected when the 3 conditions are fulfilled.
> > 1) Flink total TM memory is larger than 10g (because the network memory
> > ratio is 0.1).
> > 2) taskmanager.memory.network.max was not initially configured.
> > 3) Other memory, such as managed memory or heap memory, is insufficient.
> > I think the number of jobs fulfilling the conditions is small because
> > when
> > TM
> > uses such a large amount of memory, the network memory requirement may
> > also be large. And when encountering the issue, the rollback method is
> > very
> > simple,
> > configuring taskmanager.memory.network.max as 1g or other values.
> > In addition, the reason for modifying the default value is to simplify
> > the
> > network
> > configurations in most scenarios. This change does affect a few usage
> > scenarios,
> > but we should admit that setting the default to any value may not meet
> > the requirements of all scenarios.
> >
> > Best,
> > Yuxin
> >
> >
> > Weihua Hu <huweihua....@gmail.com> 于2022年12月26日周一 20:35写道:
> >
> > Hi Yuxin,
> > Thanks for the proposal.
> >
> > "Insufficient number of network buffers" exceptions also bother us.
> > It's
> > too hard for users to figure out
> > how much network buffer they really need. It relates to partitioner
> > type,
> > parallelism, slots per taskmanager.
> >
> > Since streaming jobs are our primary scenario, I have some questions
> > about
> > streaming jobs.
> >
> > 1. In this FLIP, all read buffers will use floating buffers when the
> > total
> > buffer is more than
> > 'taskmanager.memory.network.read-required-buffer.max'. Competition in
> > buffer allocation led to preference regression.
> > How about reducing ExclusiveBuffersPerChannel to 1 first when the total
> > buffer is not enough?
> > Will this reduce performance regression in streaming?
> >
> > 2. Changing taskmanager.memory.network.max will affect user migration
> > from
> > the lower version.
> > IMO, network buffer size should not increase with total memory,
> > especially
> > for streaming jobs with application mode.
> > For example, some ETL jobs with rescale partitioner only require a few
> > network buffers.
> > And we already have
> > 'taskmanager.memory.network.read-required-buffer.max'
> > to control maximum read network buffer usage.
> > Do we really need to change the default value of
> > 'taskmanager.memory.network.max'?
> >
> > Best,
> > Weihua
> >
> >
> > On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <tanyuxinw...@gmail.com>
> > wrote:
> >
> > Hi, all
> > Thanks for the reply and feedback for everyone!
> >
> >
> > After combining everyone's comments, the main concerns, and
> > corresponding
> > adjustments are as follows.
> >
> >
> > @Guowei Ma, Thanks for your feedback.
> > should we introduce a _new_ non-orthogonal
> > option(`taskmanager.memory.network.required-buffer-per-gate.max`).
> > That
> > is
> > to say, the option will affect both streaming and batch shuffle
> > behavior
> > at
> > the
> > same time.
> >
> > 1. Because the default option can meet most requirements no matter in
> > Streaming
> > or Batch scenarios. We do not want users to adjust this default
> > config
> > option by
> > design. This configuration option is added only to preserve the
> > possibility
> > of
> > modification options for users.
> > 2. In a few cases, if you really want to adjust this option, users
> > may
> > not
> > expect to
> > adjust the option according to Streaming or Batch, for example,
> > according
> > to the
> > parallelism of the job.
> > 3. Regarding the performance of streaming shuffle, the same problem
> > of
> > insufficient memory also exists for Streaming jobs. We introduced
> > this
> > configuration
> > to enable users to decouple memory and parallelism, but it will
> > affect
> > some
> > performance. By default, the feature is disabled and does not affect
> > performance.
> > However, the added configuration enables users to choose to decouple
> > memory
> > usage and parallelism for Streaming jobs.
> >
> > It's better not to expose more implementation-related concepts to
> > users.
> >
> > Thanks for you suggestion. I will modify the option name to avoid
> > exposing
> > implementation-related concepts. I have changed it to
> > `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
> >
> >
> >
> > @Dong Lin, Thanks for your reply.
> > it might be helpful to add a dedicated public interface section to
> > describe
> > the config key and config semantics.
> >
> > Thanks for your suggestion. I have added public interface section to
> > describe
> > the config key and config semantics clearly.
> >
> > This FLIP seems to add more configs without removing any config
> > from
> > Flink.
> >
> > This Flip is to reduce the number of options to be adjusted when
> > using
> > Flink.
> > After the Flip, the default option can meet the requirements in most
> > sceneries
> > rather than modifying any config
> > options(`taskmanager.network.memory.buffers-per-channel`
> > and `taskmanager.network.memory.floating-buffers-per-gate`), which is
> > helpful
> > to improve the out-of-box usability. In the long run, these two
> > parameters
> > `taskmanager.network.memory.buffers-per-channel` and
> > `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
> > deprecated
> > to reduce user parameters, but from the perspective of compatibility,
> > we
> > need to
> > pay attention to users' feedback before deciding to deprecate the
> > options.
> >
> >
> >
> > @Yanfei Lei,Thanks for your feedback.
> > 1. Through the option is cluster level, the default value is
> > different
> > according to the
> > job type. In other words, by default, for Batch jobs, the config
> > value
> > is
> > enabled, 1000.
> > And for Streaming jobs, the config value is not enabled by default.
> >
> > 2. I think this is a good point. The total floating buffers will not
> > change
> > with
> >
> >
> >
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
> > because this is the maximum memory threshold. But if the user
> > explicitly
> > specified
> > the ExclusiveBuffersPerChannel, the calculated result of
> > ExclusiveBuffersPerChannel * numChannels will change with it.
> >
> >
> > Thanks again for all feedback!
> >
> >
> > Best,
> > Yuxin
> >
> >
> > Zhu Zhu <reed...@gmail.com> 于2022年12月26日周一 17:18写道:
> >
> > Hi Yuxin,
> >
> > Thanks for creating this FLIP.
> >
> > It's good if Flink does not require users to set a very large
> > network
> > memory, or tune the advanced(hard-to-understand)
> > per-channel/per-gate
> > buffer configs, to avoid "Insufficient number of network buffers"
> > exceptions
> > which can easily happen for large scale jobs.
> >
> > Regarding the new config
> > "taskmanager.memory.network.read-required-buffer.max",
> > I think it's still an advanced config which users may feel hard to
> > tune.
> > However, given that in most cases users will not need to set it, I
> > think it's acceptable.
> >
> > So +1 for this FLIP.
> >
> > In the future, I think Flink should adaptively select to use
> > exclusive
> > buffers
> > or not according to whether there are sufficient network buffers at
> > runtime.
> > Users then no longer need to understand the above configuration.
> > This
> > may
> > require supporting transitions between exclusive buffers and
> > floating
> > buffers.
> > A problem of all buffer floating is that too few network buffers
> > can
> > result
> > in task slowness which is hard to identify by users. So it's also
> > needed
> > to
> > do improvements on metrics and web UI to expose such issues.
> >
> > Thanks,
> > Zhu
> >
> > Yanfei Lei <fredia...@gmail.com> 于2022年12月26日周一 11:13写道:
> >
> > Hi Yuxin,
> >
> > Thanks for the proposal!
> >
> > After reading the FLIP, I have some questions about the default
> > value.
> > This FLIP seems to introduce a *new* config
> > option(taskmanager.memory.network.required-buffer-per-gate.max)
> > to
> > control
> > the network memory usage.
> > 1. Is this configuration at the job level or cluster level? As
> > the
> > FLIP
> > described, the default values of the Batch job and Stream job are
> > different, If an explicit value is set for cluster level, will it
> > affect
> > all Batch jobs and Stream jobs on the cluster?
> >
> > 2. The default value of Batch Job depends on the value of
> >
> >
> >
> >
> >
> >
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
> > if the value of ExclusiveBuffersPerChannel changed, does
> > "taskmanager.memory.network.required-buffer-per-gate.max" need to
> > change
> > with it?
> >
> >
> > Best,
> > Yanfei
> >
> > Dong Lin <lindon...@gmail.com> 于2022年12月25日周日 08:58写道:
> >
> > Hi Yuxin,
> >
> > Thanks for proposing the FLIP!
> >
> > The motivation section makes sense. But it seems that the
> > proposed
> > change
> > section mixes the proposed config with the evaluation results.
> > It
> > is
> > a
> > bit
> > hard to understand what configs are proposed and how to
> > describe
> > these
> > configs to users. Given that the configuration setting is part
> > of
> > public
> > interfaces, it might be helpful to add a dedicated public
> > interface
> > section
> > to describe the config key and config semantics, as suggested
> > in
> > the
> > FLIP
> > template here
> > <
> >
> >
> >
> >
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >
> > .
> >
> > This FLIP seems to add more configs without removing any config
> > from
> > Flink.
> > Intuitively this can make the Flink configuration harder rather
> > than
> > simpler. Maybe we can get a better idea after we add a public
> > interface
> > section to clarify those configs.
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
> > tanyuxinw...@gmail.com>
> > wrote:
> >
> > Hi, devs,
> >
> > I'd like to start a discussion about FLIP-266: Simplify
> > network
> > memory
> > configurations for TaskManager[1].
> >
> > When using Flink, users may encounter the following issues
> > that
> > affect
> > usability.
> > 1. The job may fail with an "Insufficient number of network
> > buffers"
> > exception.
> > 2. Flink network memory size adjustment is complex.
> > When encountering these issues, users can solve some problems
> > by
> > adding
> > or
> > adjusting parameters. However, multiple memory config options
> > should
> > be
> > changed. The config option adjustment requires understanding
> > the
> > detailed
> > internal implementation, which is impractical for most users.
> >
> > To simplify network memory configurations for TaskManager and
> > improve
> > Flink
> > usability, this FLIP proposed some optimization solutions for
> > the
> > issues.
> >
> > Looking forward to your feedback.
> >
> > [1]
> >
> >
> >
> >
> >
> >
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
> >
> > Best regards,
> > Yuxin
> >
> >
> >
> >
> >
> >
> >
> >
>

Reply via email to