Hi, JasonLee

Thanks for the feedback.

> How can we determine how many RS and IG a DAG has?

Network memory is related to the parallelism and the complexity of the task
DAG, which I think is correct. However, this Flip can only improve the part
issue of the total issue, mainly focusing on memory optimization of Shuffle
reading. We can only limit the total read buffers in one InputGate, but can
not determine the number of RS and IG. The good news is that this is an
independent problem. Maybe we could try to optimize and solve that
problem later.


Best,
Yuxin


JasonLee <17610775...@163.com> 于2022年12月28日周三 19:18写道:

> Hi Yuxin
>
>
> Thanks for the proposal, big + 1 for this FLIP.
>
>
>
> It is difficult for users to calculate the size of network memory. If the
> setting is too small, the task cannot be started. If the setting is too
> large, there may be a waste of resources. As far as possible, Flink
> framework can automatically set a reasonable value, but I have a small
> problem. network memory is not only related to the parallelism of the task,
> but also to the complexity of the task DAG. The more complex a DAG is,
> shuffle write and shuffle read require larger buffers. How can we determine
> how many RS and IG a DAG has?
>
>
>
> Best
> JasonLee
>
>
> ---- Replied Message ----
> | From | Yuxin Tan<tanyuxinw...@gmail.com> |
> | Date | 12/28/2022 18:29 |
> | To | <dev@flink.apache.org> |
> | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory configurations
> for TaskManager |
> Hi, Roman
>
> Thanks for the replay.
>
> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> configurations, which are not calculated. I have described them in the FLIP
> motivation section.
>
> 3. Each gate requires at least one buffer...
> The timeout exception occurs when the ExclusiveBuffersPerChannel
> can not be requested from NetworkBufferPool, which is not caused by the
> change of this Flip. In addition, if  we have set the
> ExclusiveBuffersPerChannel
> to 0 when using floating buffers, which can also decrease the probability
> of
> this exception.
>
> 4. It would be great to have experimental results for jobs with different
> exchange types.
> Thanks for the suggestion. I have a test about different exchange types,
> forward
> and rescale, and the results show no differences from the all-to-all type,
> which
> is also understandable, because the network memory usage is calculated
> with numChannels, independent of the edge type.
>
> Best,
> Yuxin
>
>
> Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道:
>
> Hi everyone,
>
> Thanks for the proposal and the discussion.
>
> I couldn't find much details on how exactly the values of
> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
> I guess that
> - the threshold evaluation is done on JM
> - floating buffers calculation is done on TM based on the current memory
> available; so it is not taking into account any future tasks submitted for
> that (or other) job
> Is that correct?
>
> If so, I see the following potential issues:
>
> 1. Each (sub)task might have different values because the actual
> available memory might be different. E.g. some tasks might use exclusive
> buffers and others only floating. That could lead to significant skew
> in processing speed, and in turn to issues with checkpoints and watermarks.
>
> 2. Re-deployment of a task (e.g. on job failure) might lead to a completely
> different memory configuration. That, coupled with different values per
> subtask and operator, makes the performance analysis more difficult.
>
> (Regardless of whether it's done on TM or JM):
> 3. Each gate requires at least one buffer [1]. So, in case when no memory
> is available, TM will throw an Allocation timeout exception instead of
> Insufficient buffers exception immediately. A delay here (allocation
> timeout) seems like a regression.
> Besides that, the regression depends on how much memory is actually
> available and how much it is contended, doesn't it?
> Should there still be a lower threshold of available memory, below which
> the job (task) isn't accepted?
> 4. The same threshold for all types of shuffles will likely result in using
> exclusive buffers
> for point-wise connections and floating buffers for all-to-all ones. I'm
> not sure if that's always optimal. It would be great to have experimental
> results for jobs with different exchange types, WDYT?
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-24035
>
> Regards,
> Roman
>
>
> On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <tanyuxinw...@gmail.com> wrote:
>
> Hi, Weihua
>
> Thanks for your suggestions.
>
> 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
> total buffer is not enough?
>
> I think it's a good idea. Will try and check the results in PoC. Before
> all
> read buffers use floating buffers, I will try to use
> (ExclusiveBuffersPerChannel - i)
> buffers per channel first. For example, if the user has configured
> ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
> are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
> all channels is 1 and all read buffers are insufficient, all read buffers
> will use floating buffers.
> If the test results prove better, the FLIP will use this method.
>
> 2. Do we really need to change the default value of
> 'taskmanager.memory.network.max'?
>
> Changing taskmanager.memory.network.max will indeed affect some
> users, but the user only is affected when the 3 conditions are fulfilled.
> 1) Flink total TM memory is larger than 10g (because the network memory
> ratio is 0.1).
> 2) taskmanager.memory.network.max was not initially configured.
> 3) Other memory, such as managed memory or heap memory, is insufficient.
> I think the number of jobs fulfilling the conditions is small because
> when
> TM
> uses such a large amount of memory, the network memory requirement may
> also be large. And when encountering the issue, the rollback method is
> very
> simple,
> configuring taskmanager.memory.network.max as 1g or other values.
> In addition, the reason for modifying the default value is to simplify
> the
> network
> configurations in most scenarios. This change does affect a few usage
> scenarios,
> but we should admit that setting the default to any value may not meet
> the requirements of all scenarios.
>
> Best,
> Yuxin
>
>
> Weihua Hu <huweihua....@gmail.com> 于2022年12月26日周一 20:35写道:
>
> Hi Yuxin,
> Thanks for the proposal.
>
> "Insufficient number of network buffers" exceptions also bother us.
> It's
> too hard for users to figure out
> how much network buffer they really need. It relates to partitioner
> type,
> parallelism, slots per taskmanager.
>
> Since streaming jobs are our primary scenario, I have some questions
> about
> streaming jobs.
>
> 1. In this FLIP, all read buffers will use floating buffers when the
> total
> buffer is more than
> 'taskmanager.memory.network.read-required-buffer.max'. Competition in
> buffer allocation led to preference regression.
> How about reducing ExclusiveBuffersPerChannel to 1 first when the total
> buffer is not enough?
> Will this reduce performance regression in streaming?
>
> 2. Changing taskmanager.memory.network.max will affect user migration
> from
> the lower version.
> IMO, network buffer size should not increase with total memory,
> especially
> for streaming jobs with application mode.
> For example, some ETL jobs with rescale partitioner only require a few
> network buffers.
> And we already have
> 'taskmanager.memory.network.read-required-buffer.max'
> to control maximum read network buffer usage.
> Do we really need to change the default value of
> 'taskmanager.memory.network.max'?
>
> Best,
> Weihua
>
>
> On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <tanyuxinw...@gmail.com>
> wrote:
>
> Hi, all
> Thanks for the reply and feedback for everyone!
>
>
> After combining everyone's comments, the main concerns, and
> corresponding
> adjustments are as follows.
>
>
> @Guowei Ma, Thanks for your feedback.
> should we introduce a _new_ non-orthogonal
> option(`taskmanager.memory.network.required-buffer-per-gate.max`).
> That
> is
> to say, the option will affect both streaming and batch shuffle
> behavior
> at
> the
> same time.
>
> 1. Because the default option can meet most requirements no matter in
> Streaming
> or Batch scenarios. We do not want users to adjust this default
> config
> option by
> design. This configuration option is added only to preserve the
> possibility
> of
> modification options for users.
> 2. In a few cases, if you really want to adjust this option, users
> may
> not
> expect to
> adjust the option according to Streaming or Batch, for example,
> according
> to the
> parallelism of the job.
> 3. Regarding the performance of streaming shuffle, the same problem
> of
> insufficient memory also exists for Streaming jobs. We introduced
> this
> configuration
> to enable users to decouple memory and parallelism, but it will
> affect
> some
> performance. By default, the feature is disabled and does not affect
> performance.
> However, the added configuration enables users to choose to decouple
> memory
> usage and parallelism for Streaming jobs.
>
> It's better not to expose more implementation-related concepts to
> users.
>
> Thanks for you suggestion. I will modify the option name to avoid
> exposing
> implementation-related concepts. I have changed it to
> `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
>
>
>
> @Dong Lin, Thanks for your reply.
> it might be helpful to add a dedicated public interface section to
> describe
> the config key and config semantics.
>
> Thanks for your suggestion. I have added public interface section to
> describe
> the config key and config semantics clearly.
>
> This FLIP seems to add more configs without removing any config
> from
> Flink.
>
> This Flip is to reduce the number of options to be adjusted when
> using
> Flink.
> After the Flip, the default option can meet the requirements in most
> sceneries
> rather than modifying any config
> options(`taskmanager.network.memory.buffers-per-channel`
> and `taskmanager.network.memory.floating-buffers-per-gate`), which is
> helpful
> to improve the out-of-box usability. In the long run, these two
> parameters
> `taskmanager.network.memory.buffers-per-channel` and
> `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
> deprecated
> to reduce user parameters, but from the perspective of compatibility,
> we
> need to
> pay attention to users' feedback before deciding to deprecate the
> options.
>
>
>
> @Yanfei Lei,Thanks for your feedback.
> 1. Through the option is cluster level, the default value is
> different
> according to the
> job type. In other words, by default, for Batch jobs, the config
> value
> is
> enabled, 1000.
> And for Streaming jobs, the config value is not enabled by default.
>
> 2. I think this is a good point. The total floating buffers will not
> change
> with
>
>
>
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
> because this is the maximum memory threshold. But if the user
> explicitly
> specified
> the ExclusiveBuffersPerChannel, the calculated result of
> ExclusiveBuffersPerChannel * numChannels will change with it.
>
>
> Thanks again for all feedback!
>
>
> Best,
> Yuxin
>
>
> Zhu Zhu <reed...@gmail.com> 于2022年12月26日周一 17:18写道:
>
> Hi Yuxin,
>
> Thanks for creating this FLIP.
>
> It's good if Flink does not require users to set a very large
> network
> memory, or tune the advanced(hard-to-understand)
> per-channel/per-gate
> buffer configs, to avoid "Insufficient number of network buffers"
> exceptions
> which can easily happen for large scale jobs.
>
> Regarding the new config
> "taskmanager.memory.network.read-required-buffer.max",
> I think it's still an advanced config which users may feel hard to
> tune.
> However, given that in most cases users will not need to set it, I
> think it's acceptable.
>
> So +1 for this FLIP.
>
> In the future, I think Flink should adaptively select to use
> exclusive
> buffers
> or not according to whether there are sufficient network buffers at
> runtime.
> Users then no longer need to understand the above configuration.
> This
> may
> require supporting transitions between exclusive buffers and
> floating
> buffers.
> A problem of all buffer floating is that too few network buffers
> can
> result
> in task slowness which is hard to identify by users. So it's also
> needed
> to
> do improvements on metrics and web UI to expose such issues.
>
> Thanks,
> Zhu
>
> Yanfei Lei <fredia...@gmail.com> 于2022年12月26日周一 11:13写道:
>
> Hi Yuxin,
>
> Thanks for the proposal!
>
> After reading the FLIP, I have some questions about the default
> value.
> This FLIP seems to introduce a *new* config
> option(taskmanager.memory.network.required-buffer-per-gate.max)
> to
> control
> the network memory usage.
> 1. Is this configuration at the job level or cluster level? As
> the
> FLIP
> described, the default values of the Batch job and Stream job are
> different, If an explicit value is set for cluster level, will it
> affect
> all Batch jobs and Stream jobs on the cluster?
>
> 2. The default value of Batch Job depends on the value of
>
>
>
>
>
> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
> if the value of ExclusiveBuffersPerChannel changed, does
> "taskmanager.memory.network.required-buffer-per-gate.max" need to
> change
> with it?
>
>
> Best,
> Yanfei
>
> Dong Lin <lindon...@gmail.com> 于2022年12月25日周日 08:58写道:
>
> Hi Yuxin,
>
> Thanks for proposing the FLIP!
>
> The motivation section makes sense. But it seems that the
> proposed
> change
> section mixes the proposed config with the evaluation results.
> It
> is
> a
> bit
> hard to understand what configs are proposed and how to
> describe
> these
> configs to users. Given that the configuration setting is part
> of
> public
> interfaces, it might be helpful to add a dedicated public
> interface
> section
> to describe the config key and config semantics, as suggested
> in
> the
> FLIP
> template here
> <
>
>
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
> .
>
> This FLIP seems to add more configs without removing any config
> from
> Flink.
> Intuitively this can make the Flink configuration harder rather
> than
> simpler. Maybe we can get a better idea after we add a public
> interface
> section to clarify those configs.
>
> Thanks,
> Dong
>
>
> On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
> tanyuxinw...@gmail.com>
> wrote:
>
> Hi, devs,
>
> I'd like to start a discussion about FLIP-266: Simplify
> network
> memory
> configurations for TaskManager[1].
>
> When using Flink, users may encounter the following issues
> that
> affect
> usability.
> 1. The job may fail with an "Insufficient number of network
> buffers"
> exception.
> 2. Flink network memory size adjustment is complex.
> When encountering these issues, users can solve some problems
> by
> adding
> or
> adjusting parameters. However, multiple memory config options
> should
> be
> changed. The config option adjustment requires understanding
> the
> detailed
> internal implementation, which is impractical for most users.
>
> To simplify network memory configurations for TaskManager and
> improve
> Flink
> usability, this FLIP proposed some optimization solutions for
> the
> issues.
>
> Looking forward to your feedback.
>
> [1]
>
>
>
>
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
>
> Best regards,
> Yuxin
>
>
>
>
>
>
>
>

Reply via email to