Hi Yuxin
Thanks for the proposal, big + 1 for this FLIP. It is difficult for users to calculate the size of network memory. If the setting is too small, the task cannot be started. If the setting is too large, there may be a waste of resources. As far as possible, Flink framework can automatically set a reasonable value, but I have a small problem. network memory is not only related to the parallelism of the task, but also to the complexity of the task DAG. The more complex a DAG is, shuffle write and shuffle read require larger buffers. How can we determine how many RS and IG a DAG has? Best JasonLee ---- Replied Message ---- | From | Yuxin Tan<tanyuxinw...@gmail.com> | | Date | 12/28/2022 18:29 | | To | <dev@flink.apache.org> | | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager | Hi, Roman Thanks for the replay. ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from configurations, which are not calculated. I have described them in the FLIP motivation section. 3. Each gate requires at least one buffer... The timeout exception occurs when the ExclusiveBuffersPerChannel can not be requested from NetworkBufferPool, which is not caused by the change of this Flip. In addition, if we have set the ExclusiveBuffersPerChannel to 0 when using floating buffers, which can also decrease the probability of this exception. 4. It would be great to have experimental results for jobs with different exchange types. Thanks for the suggestion. I have a test about different exchange types, forward and rescale, and the results show no differences from the all-to-all type, which is also understandable, because the network memory usage is calculated with numChannels, independent of the edge type. Best, Yuxin Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道: Hi everyone, Thanks for the proposal and the discussion. I couldn't find much details on how exactly the values of ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated. I guess that - the threshold evaluation is done on JM - floating buffers calculation is done on TM based on the current memory available; so it is not taking into account any future tasks submitted for that (or other) job Is that correct? If so, I see the following potential issues: 1. Each (sub)task might have different values because the actual available memory might be different. E.g. some tasks might use exclusive buffers and others only floating. That could lead to significant skew in processing speed, and in turn to issues with checkpoints and watermarks. 2. Re-deployment of a task (e.g. on job failure) might lead to a completely different memory configuration. That, coupled with different values per subtask and operator, makes the performance analysis more difficult. (Regardless of whether it's done on TM or JM): 3. Each gate requires at least one buffer [1]. So, in case when no memory is available, TM will throw an Allocation timeout exception instead of Insufficient buffers exception immediately. A delay here (allocation timeout) seems like a regression. Besides that, the regression depends on how much memory is actually available and how much it is contended, doesn't it? Should there still be a lower threshold of available memory, below which the job (task) isn't accepted? 4. The same threshold for all types of shuffles will likely result in using exclusive buffers for point-wise connections and floating buffers for all-to-all ones. I'm not sure if that's always optimal. It would be great to have experimental results for jobs with different exchange types, WDYT? [1] https://issues.apache.org/jira/browse/FLINK-24035 Regards, Roman On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <tanyuxinw...@gmail.com> wrote: Hi, Weihua Thanks for your suggestions. 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the total buffer is not enough? I think it's a good idea. Will try and check the results in PoC. Before all read buffers use floating buffers, I will try to use (ExclusiveBuffersPerChannel - i) buffers per channel first. For example, if the user has configured ExclusiveBuffersPerChannel to 4, it will check whether all read buffers are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of all channels is 1 and all read buffers are insufficient, all read buffers will use floating buffers. If the test results prove better, the FLIP will use this method. 2. Do we really need to change the default value of 'taskmanager.memory.network.max'? Changing taskmanager.memory.network.max will indeed affect some users, but the user only is affected when the 3 conditions are fulfilled. 1) Flink total TM memory is larger than 10g (because the network memory ratio is 0.1). 2) taskmanager.memory.network.max was not initially configured. 3) Other memory, such as managed memory or heap memory, is insufficient. I think the number of jobs fulfilling the conditions is small because when TM uses such a large amount of memory, the network memory requirement may also be large. And when encountering the issue, the rollback method is very simple, configuring taskmanager.memory.network.max as 1g or other values. In addition, the reason for modifying the default value is to simplify the network configurations in most scenarios. This change does affect a few usage scenarios, but we should admit that setting the default to any value may not meet the requirements of all scenarios. Best, Yuxin Weihua Hu <huweihua....@gmail.com> 于2022年12月26日周一 20:35写道: Hi Yuxin, Thanks for the proposal. "Insufficient number of network buffers" exceptions also bother us. It's too hard for users to figure out how much network buffer they really need. It relates to partitioner type, parallelism, slots per taskmanager. Since streaming jobs are our primary scenario, I have some questions about streaming jobs. 1. In this FLIP, all read buffers will use floating buffers when the total buffer is more than 'taskmanager.memory.network.read-required-buffer.max'. Competition in buffer allocation led to preference regression. How about reducing ExclusiveBuffersPerChannel to 1 first when the total buffer is not enough? Will this reduce performance regression in streaming? 2. Changing taskmanager.memory.network.max will affect user migration from the lower version. IMO, network buffer size should not increase with total memory, especially for streaming jobs with application mode. For example, some ETL jobs with rescale partitioner only require a few network buffers. And we already have 'taskmanager.memory.network.read-required-buffer.max' to control maximum read network buffer usage. Do we really need to change the default value of 'taskmanager.memory.network.max'? Best, Weihua On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <tanyuxinw...@gmail.com> wrote: Hi, all Thanks for the reply and feedback for everyone! After combining everyone's comments, the main concerns, and corresponding adjustments are as follows. @Guowei Ma, Thanks for your feedback. should we introduce a _new_ non-orthogonal option(`taskmanager.memory.network.required-buffer-per-gate.max`). That is to say, the option will affect both streaming and batch shuffle behavior at the same time. 1. Because the default option can meet most requirements no matter in Streaming or Batch scenarios. We do not want users to adjust this default config option by design. This configuration option is added only to preserve the possibility of modification options for users. 2. In a few cases, if you really want to adjust this option, users may not expect to adjust the option according to Streaming or Batch, for example, according to the parallelism of the job. 3. Regarding the performance of streaming shuffle, the same problem of insufficient memory also exists for Streaming jobs. We introduced this configuration to enable users to decouple memory and parallelism, but it will affect some performance. By default, the feature is disabled and does not affect performance. However, the added configuration enables users to choose to decouple memory usage and parallelism for Streaming jobs. It's better not to expose more implementation-related concepts to users. Thanks for you suggestion. I will modify the option name to avoid exposing implementation-related concepts. I have changed it to `taskmanager.memory.network.read-required-buffer.max` in the FLIP. @Dong Lin, Thanks for your reply. it might be helpful to add a dedicated public interface section to describe the config key and config semantics. Thanks for your suggestion. I have added public interface section to describe the config key and config semantics clearly. This FLIP seems to add more configs without removing any config from Flink. This Flip is to reduce the number of options to be adjusted when using Flink. After the Flip, the default option can meet the requirements in most sceneries rather than modifying any config options(`taskmanager.network.memory.buffers-per-channel` and `taskmanager.network.memory.floating-buffers-per-gate`), which is helpful to improve the out-of-box usability. In the long run, these two parameters `taskmanager.network.memory.buffers-per-channel` and `taskmanager.network.memory.floating-buffers-per-gate` may indeed be deprecated to reduce user parameters, but from the perspective of compatibility, we need to pay attention to users' feedback before deciding to deprecate the options. @Yanfei Lei,Thanks for your feedback. 1. Through the option is cluster level, the default value is different according to the job type. In other words, by default, for Batch jobs, the config value is enabled, 1000. And for Streaming jobs, the config value is not enabled by default. 2. I think this is a good point. The total floating buffers will not change with ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel) because this is the maximum memory threshold. But if the user explicitly specified the ExclusiveBuffersPerChannel, the calculated result of ExclusiveBuffersPerChannel * numChannels will change with it. Thanks again for all feedback! Best, Yuxin Zhu Zhu <reed...@gmail.com> 于2022年12月26日周一 17:18写道: Hi Yuxin, Thanks for creating this FLIP. It's good if Flink does not require users to set a very large network memory, or tune the advanced(hard-to-understand) per-channel/per-gate buffer configs, to avoid "Insufficient number of network buffers" exceptions which can easily happen for large scale jobs. Regarding the new config "taskmanager.memory.network.read-required-buffer.max", I think it's still an advanced config which users may feel hard to tune. However, given that in most cases users will not need to set it, I think it's acceptable. So +1 for this FLIP. In the future, I think Flink should adaptively select to use exclusive buffers or not according to whether there are sufficient network buffers at runtime. Users then no longer need to understand the above configuration. This may require supporting transitions between exclusive buffers and floating buffers. A problem of all buffer floating is that too few network buffers can result in task slowness which is hard to identify by users. So it's also needed to do improvements on metrics and web UI to expose such issues. Thanks, Zhu Yanfei Lei <fredia...@gmail.com> 于2022年12月26日周一 11:13写道: Hi Yuxin, Thanks for the proposal! After reading the FLIP, I have some questions about the default value. This FLIP seems to introduce a *new* config option(taskmanager.memory.network.required-buffer-per-gate.max) to control the network memory usage. 1. Is this configuration at the job level or cluster level? As the FLIP described, the default values of the Batch job and Stream job are different, If an explicit value is set for cluster level, will it affect all Batch jobs and Stream jobs on the cluster? 2. The default value of Batch Job depends on the value of ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel), if the value of ExclusiveBuffersPerChannel changed, does "taskmanager.memory.network.required-buffer-per-gate.max" need to change with it? Best, Yanfei Dong Lin <lindon...@gmail.com> 于2022年12月25日周日 08:58写道: Hi Yuxin, Thanks for proposing the FLIP! The motivation section makes sense. But it seems that the proposed change section mixes the proposed config with the evaluation results. It is a bit hard to understand what configs are proposed and how to describe these configs to users. Given that the configuration setting is part of public interfaces, it might be helpful to add a dedicated public interface section to describe the config key and config semantics, as suggested in the FLIP template here < https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals . This FLIP seems to add more configs without removing any config from Flink. Intuitively this can make the Flink configuration harder rather than simpler. Maybe we can get a better idea after we add a public interface section to clarify those configs. Thanks, Dong On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan < tanyuxinw...@gmail.com> wrote: Hi, devs, I'd like to start a discussion about FLIP-266: Simplify network memory configurations for TaskManager[1]. When using Flink, users may encounter the following issues that affect usability. 1. The job may fail with an "Insufficient number of network buffers" exception. 2. Flink network memory size adjustment is complex. When encountering these issues, users can solve some problems by adding or adjusting parameters. However, multiple memory config options should be changed. The config option adjustment requires understanding the detailed internal implementation, which is impractical for most users. To simplify network memory configurations for TaskManager and improve Flink usability, this FLIP proposed some optimization solutions for the issues. Looking forward to your feedback. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager Best regards, Yuxin