Thanks Yuepeng! The changes in https://github.com/apache/flink/pull/25218/files now look good to me.
-Max On Sat, Jan 25, 2025 at 2:36 PM Yuepeng Pan <panyuep...@apache.org> wrote: > > Hi all, > > It hasn't received further responses to this email over the past few days, > and as such, we currently lack sufficient feedback to draw a definitive > conclusion. > I'd like to proceed by merging both > approaches as much as possible to reach the final consensus. > I'll move forward with this plan to address > the issue and update the PR[1] accordingly: > > > - It's agreeded to optimize/fix this issue in the 1.x TLS versions. > > - The primary goal of this optimization/fix is to minimize the number of > > TaskManagers used in application mode only. > > - The optimized logic will be set as the default logic, and the original > > logic will be retained when explicitly enabled through a parameter. > > Please rest assured, I am not rushing ahead recklessly. > If there is anything inappropriate about this conclusion/measure or the > reasoning supporting it, > I will promptly halt and revise the action. > > > a. The default behavior of this new parameter (the optimized logic) aligns > > with the expected behavior of most application deployment mode users > > who would have this behavior enabled by default. > > Therefore, it doesn't add complexity in terms of configuration for them. > > b. However, this would require a small number of users who want to keep the > > original behavior to actively configure this setting. > > So, this still gives users the flexibility to choose. > > c. Since this issue is only fixed by adopting this solution in version 1.x > > LTS with application deployment mode, > > this parameter doesn't have a plan for forward compatibility, and a new > > parameter would also be acceptable to me. > > Thank you all very much for your attention and help. > > Best, > Yuepeng Pan > > [1] https://github.com/apache/flink/pull/25218 > > > On 2025/01/20 01:56:36 Yuepeng Pan wrote: > > Hi, Maximilian, Rui, Matthias: > > Thanks for the response, which gives me a general understanding of your > > proposed approach and its implementation outline. > > > > Hi, All: > > Thank you all very much for the discussion and suggestions. > > > > Based on the discussions we have received so far, > > we have reached a preliminary consensus: > > > > - When the job runs in an application cluster, the default behavior > > of AdaptiveScheduler not actively releasing Taskmanagers resources > > during downscaling could be considered a > > bug (At least from certain perspectives, this is the case). > > - We should fix it in flink 1.x. > > > > However, there's still no consensus in the discussion on how to fix this > > issue under the following conditions: > > - Flink 1.x series versions and Application deployment mode (It's not about > > to session cluster mode.) > > > > Strategy list: > > > > 1). Adding this behavior while being guarded by a feature > > flag/configuration parameter in the 1.x LTS version. > > (@Matthias If my understanding is incorrect, please correct me, thanks! > > ) > > a. This enables the option for users to revert to the original behavior > > eg. when ignoring idle resource occupation and focusing only > > on the resource waiting time during rescaling, this can achieve some > > positive impacts. > > b. Introducing new parameters increases the complexity for users, > > as Maximilian mentioned, we already have many parameters. > > > > 2). Set the behavior as the default without introducing new parameters in > > the 1.x LTS version. > > a. Avoid introducing new parameters and reduce complexity for users. > > b. This disables the option for users to revert to the original > > behavior. > > > > We have to seek some trade-offs/change between the two options above in > > order to make a choice and reach a consensus on the conclusion. > > > > Although Option-1) increases the complexity for users to use, I prefer to > > Option-1) due to the following reasons if we could set the default behavior > > in Option - 1) to the new behavior: > > a. This new parameter aligns with the expected behavior for most > > application deployment mode users, > > who would have this behavior enabled by default. > > Therefore, it doesn't add complexity in terms of configuration for > > them. > > b. However, this would require users who want to keep the original > > behavior to actively configure this setting. > > So, this still gives users the flexibility to choose. > > c. Since this issue is only fixed by adopting this solution in version > > 1.x LTS with application deployment mode, > > this parameter doesn't have a plan for forward compatibility, and a > > new parameter would also be acceptable to me. > > > > I'd like to hear more ideas about it or your opinions on the options > > mentioned above to reach a final and reasonable consensus. > > > > Thank you very much. > > > > Best, > > Yuepeng. > > > > > > On 2025/01/15 08:54:23 Maximilian Michels wrote: > > > Hey Yuepeng, > > > > > > I think that would work. > > > > > > Thanks, > > > Max > > > > > > On Sun, Jan 12, 2025 at 3:42 PM Yuepeng Pan <panyuep...@apache.org> wrote: > > > > > > > > Hi, Maximilian, Thank you very much for your reply and suggestions. > > > > > > > > That makes sense to me. > > > > > > > > > > > > > > > > > > > > > Do you think we could condition the DefaultSlotAssigner based > > > > > > > > > on whether the cluster is a session or an application cluster? We > > > > > > > > > would use the new slot assignment for application clusters. We could > > > > > > > > > do this via an internal configuration option, but I would advise not > > > > > > > > > to add a public one, as we have too many already. > > > > > > > > > > > > > > > > > > > > In my limited reading, perhaps we could use the 'execution.target' > > > > configuration > > > > > > > > in the running cluster to make such a determination. > > > > > > > > > > > > > > > > > > > > The value of 'execution.target' on the following cases: > > > > > > > > > > > > > > > > > > > > - 0). ${flink deployment mode} -> ${the value of 'execution.target'} > > > > > > > > > > > > > > > > > > > > - 1). yarn-application -> embedded > > > > > > > > - 2). local application mode -> embedded > > > > > > > > - 3). k8s-application -> embedded > > > > > > > > - 4). yarn-per-job -> yarn-per-job > > > > > > > > - 5). k8s-session -> kubernetes-session > > > > > > > > - 6). yarn-session -> yarn-session > > > > > > > > - 7). standalone session -> local > > > > > > > > - 8). local-minicluster -> local > > > > > > > > > > > > > > > > > > > > For items 1), 2), 3), and 4), using the new slot prioritization > > > > strategy mentioned previous may be a good option. > > > > If I'm wrong, please feel free to correct me. > > > > And I would greatly appreciate it if you could provide more information. > > > > > > > > > > > > > > > > > > > > Looking forward to your reply. > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > Yuepeng Pan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > At 2025-01-10 17:12:21, "Maximilian Michels" <m...@apache.org> wrote: > > > > ># Recap > > > > > > > > > >The current slot assignment strategy via DefaultSlotAssigner is to > > > > >pseudo-randomly assign the available TM slots. That works fine in the > > > > >following scenarios: > > > > > > > > > >1. The number of TMs remains constant > > > > >2. There is only a single slot per TaskManager > > > > > > > > > >As soon as we dynamically modify the job resource requirements via the > > > > >AdaptiveScheduler, the current slot assignment strategy makes it near > > > > >impossible to have TaskManagers without used slots, which makes > > > > >scaling down the number of TaskManagers very unpredictable and in many > > > > >cases impossible. > > > > > > > > > >The solution in https://github.com/apache/flink/pull/25218/files sorts > > > > >the TaskManager by least available slots. There were concerns raised > > > > >that in session clusters, this would result in more clocked clusters, > > > > >due to tasks being less spread-out. I agree that we probably don't > > > > >want to change this behavior in 1.X for session clusters. > > > > > > > > > ># Proposal > > > > > > > > > >@Yuepeng Do you think we could condition the DefaultSlotAssigner based > > > > >on whether the cluster is a session or an application cluster? We > > > > >would use the new slot assignment for application clusters. We could > > > > >do this via an internal configuration option, but I would advise not > > > > >to add a public one, as we have too many already. > > > > > > > > > >-Max > > > > > > > > > > > > > > > > > > > >On Tue, Jan 7, 2025 at 8:22 AM Yuepeng Pan <panyuep...@apache.org> > > > > >wrote: > > > > >> > > > > >> Thanks Max and Rui for the reply and clarification. > > > > >> > > > > >> > > > > >> > > > > >> IIUC, Would setting the slot assignment strategy of > > > > >> DefaultSlotAssigner to prioritize using the minimum number of > > > > >> TaskManagers by default solve the problem? > > > > >> > > > > >> > > > > >> > > > > >> I'd be appreciated with your confirmation. > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> Best, > > > > >> > > > > >> Yuepeng Pan > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> At 2025-01-07 10:16:07, "Rui Fan" <1996fan...@gmail.com> wrote: > > > > >> >Happy new year! And thanks Matthias, Yuepeng and Max for your > > > > >> >comments! > > > > >> > > > > > >> >For the reference to FLIP-138[1] from Matthias: > > > > >> > > > > > >> >As FLIP-138 mentioned: > > > > >> > > > > > >> >> In a future version, we might think about letting the > > > > >> >> ResourceManager > > > > >> >balance resources across jobs. > > > > >> > > > > > >> >I agree with this, balancing resources might be needed only > > > > >> >when a flink cluster has multiple jobs (in session mode). > > > > >> > > > > > >> >For Yuepeng's summary: > > > > >> > > > > > >> >> Please let me make a brief summary based on the historical > > > > >> >> comments: > > > > >> >> - It's agreeded to optimize/fix this issue in the 1.x TLS > > > > >> >> versions. > > > > >> >> - The primary goal of this optimization/fix is to minimize the > > > > >> >> number of > > > > >> >TaskManagers used in application mode. > > > > >> >> - The optimized logic should be controlled via a parameter. > > > > >> > > > > > >> >IIUC, the second and third points are in conflict. The second point > > > > >> >means the goal is to fix it in application mode, but the third point > > > > >> >might be needed only in session mode. If we introduce a new option > > > > >> >to balance resources in the future, it's better to only take effect > > > > >> >in the session mode. And the new option could be ignored in > > > > >> >the application mode. > > > > >> > > > > > >> >So I'm not sure whether we will fix this issue in flink 1.x for both > > > > >> >application mode and session mode? > > > > >> > > > > > >> >Generally, I'm +1 for Max's suggestion of application mode. > > > > >> > > > > > >> >Please correct me if I misunderstand anything. > > > > >> > > > > > >> >[1] > > > > >> >https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158873338#FLIP138:DeclarativeResourcemanagement-Howtodistributeslotsacrossdifferentjobs > > > > >> > > > > > >> >Best, > > > > >> >Rui > > > > >> > > > > > >> >On Tue, Jan 7, 2025 at 1:52 AM Maximilian Michels <m...@apache.org> > > > > >> >wrote: > > > > >> > > > > > >> >> Thanks Yuepeng for your work on this issue! > > > > >> >> > > > > >> >> I would advise not to add yet another config option to Flink. In > > > > >> >> application mode, the scheduler should default to using the least > > > > >> >> amount of > > > > >> >> resources required. From my perspective, the current behavior is > > > > >> >> a bug and > > > > >> >> it doesn't help that we can come up with scenarios where the > > > > >> >> current > > > > >> >> behavior may be more optimal (e.g. local state recovery turned > > > > >> >> on). > > > > >> >> Ultimately, it's not what users expect and we don't need another > > > > >> >> configuration option that users can set. We need sane defaults > > > > >> >> and I would > > > > >> >> strongly suggest that we fix the current default, especially > > > > >> >> because there > > > > >> >> aren't any drawbacks for existing users. > > > > >> >> > > > > >> >> -Max > > > > >> >> > > > > >> >> On Mon, Jan 6, 2025 at 7:56 AM Yuepeng Pan > > > > >> >> <panyuep...@apache.org> wrote: > > > > >> >> > > > > >> >> > Thank you Matthias and all for the feedback and suggestions. > > > > >> >> > > > > > >> >> > Please let me make a brief summary based on the historical > > > > >> >> > comments: > > > > >> >> > - It's agreeded to optimize/fix this issue in the 1.x TLS > > > > >> >> > versions. > > > > >> >> > - The primary goal of this optimization/fix is to minimize the > > > > >> >> > number of > > > > >> >> > TaskManagers used in application mode. > > > > >> >> > - The optimized logic should be controlled via a parameter. > > > > >> >> > > > > > >> >> > I'd like to introduce the following parameter to control > > > > >> >> > whether the > > > > >> >> > optimized logic should be enabled: > > > > >> >> > - Name: > > > > >> >> > jobmanager.adaptive-scheduler.resource.minimal-taskmanagers-preferred > > > > >> >> > - Type: boolean > > > > >> >> > - Default value: false > > > > >> >> > - Description: This parameter defines whether the adaptive > > > > >> >> > scheduler > > > > >> >> > prioritizes > > > > >> >> > using the minimum number of TaskManagers when scheduling tasks. > > > > >> >> > Note: This parameter is currently suitable for cases that > > > > >> >> > execution.state-recovery.from-local is disabled.' > > > > >> >> > > > > > >> >> > BTW, I'm uncertain whether the introduction of a parameter for > > > > >> >> > this > > > > >> >> > specific fix necessitates documentation via a FLIP. > > > > >> >> > If so, I'm willing to initiate a FLIP to aid in subsequent > > > > >> >> > tasks. > > > > >> >> > If not, I will add this email address to the corresponding jira > > > > >> >> > ticket's > > > > >> >> > comments for tracking and start the work on MR. > > > > >> >> > > > > > >> >> > Any suggestion would be appreciated! > > > > >> >> > > > > > >> >> > Thank you! > > > > >> >> > > > > > >> >> > Best, > > > > >> >> > Yuepeng Pan > > > > >> >> > > > > > >> >> > On 2025/01/05 18:41:11 Matthias Pohl wrote: > > > > >> >> > > Hi everyone and sorry for the late reply. I was mostly off in > > > > >> >> > > November > > > > >> >> > and > > > > >> >> > > forgot about that topic in December last year. > > > > >> >> > > > > > > >> >> > > Thanks for summarizing and bringing up user feedback. I see > > > > >> >> > > the problem > > > > >> >> > and > > > > >> >> > > agree with your view that it's a topic that we might want to > > > > >> >> > > address in > > > > >> >> > the > > > > >> >> > > 1.x LTS version. I see how this can be labeled as a bug or a > > > > >> >> > > feature > > > > >> >> > > depending on the perspective. I think adding this behavior > > > > >> >> > > while being > > > > >> >> > > guarded by a feature flag/configuration parameter in the 1.x > > > > >> >> > > LTS > > > > >> >> version > > > > >> >> > is > > > > >> >> > > reasonable. > > > > >> >> > > > > > > >> >> > > Best, > > > > >> >> > > Matthias > > > > >> >> > > > > > > >> >> > > [1] > > > > >> >> > > > > > > >> >> > > > > > >> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158873338#FLIP138:DeclarativeResourcemanagement-Howtodistributeslotsacrossdifferentjobs > > > > >> >> > > > > > > >> >> > > On Wed, Nov 6, 2024 at 9:21 AM Rui Fan <1996fan...@gmail.com> > > > > >> >> > > wrote: > > > > >> >> > > > > > > >> >> > > > Thanks Yuepeng for the PR and starting this discussion! > > > > >> >> > > > > > > > >> >> > > > And thanks Gyula and Yuanfeng for the input! > > > > >> >> > > > > > > > >> >> > > > I also agree to fix this behaviour in the 1.x line. > > > > >> >> > > > > > > > >> >> > > > The adaptive scheduler and rescaling API provide powerful > > > > >> >> capabilities > > > > >> >> > to > > > > >> >> > > > increase or decrease parallelism. > > > > >> >> > > > > > > > >> >> > > > The main benefit I understand of decreasing parallelism is > > > > >> >> > > > saving > > > > >> >> > > > resources. > > > > >> >> > > > If decreasing parallelism can't save resources, why do > > > > >> >> > > > users decrease > > > > >> >> > it? > > > > >> >> > > > This is why I think releasing TM resources when decreasing > > > > >> >> parallelism > > > > >> >> > is > > > > >> >> > > > a basic capability that the Adaptive Scheduler should have. > > > > >> >> > > > > > > > >> >> > > > Please correct me if I miss anything, thanks~ > > > > >> >> > > > > > > > >> >> > > > Also, I believe it does not work as the user expects. > > > > >> >> > > > Because this > > > > >> >> > > > behaviour > > > > >> >> > > > was reported multiple times in the flink community, such as: > > > > >> >> > > > FLINK-33977[1], > > > > >> >> > > > FLINK-35594[2], FLINK-35903[3] and Slack channel[4]. > > > > >> >> > > > And 1.20.x is a LTS version, so I agree to fix it in the > > > > >> >> > > > 1.x line. > > > > >> >> > > > > > > > >> >> > > > [1] https://issues.apache.org/jira/browse/FLINK-33977 > > > > >> >> > > > [2] https://issues.apache.org/jira/browse/FLINK-35594 > > > > >> >> > > > [3] https://issues.apache.org/jira/browse/FLINK-35903 > > > > >> >> > > > [4] > > > > >> >> > https://apache-flink.slack.com/archives/C03G7LJTS2G/p1729167222445569 > > > > >> >> > > > > > > > >> >> > > > Best, > > > > >> >> > > > Rui > > > > >> >> > > > > > > > >> >> > > > On Wed, Nov 6, 2024 at 4:15 PM yuanfeng hu > > > > >> >> > > > <yuanf...@apache.org> > > > > >> >> > wrote: > > > > >> >> > > > > > > > >> >> > > >> > Is it considered an error if the adaptive scheduler > > > > >> >> > > >> > fails to > > > > >> >> > release the > > > > >> >> > > >> task manager during scaling? > > > > >> >> > > >> > > > > >> >> > > >> +1 . When we enable adaptive mode and perform scaling > > > > >> >> > > >> operations on > > > > >> >> > tasks, > > > > >> >> > > >> a significant part of the goal is to reduce resource usage > > > > >> >> > > >> for the > > > > >> >> > tasks. > > > > >> >> > > >> However, due to some logic in the adaptive scheduler's > > > > >> >> > > >> scheduling > > > > >> >> > process, > > > > >> >> > > >> the task manager cannot be released, and the ultimate goal > > > > >> >> > > >> cannot be > > > > >> >> > > >> achieved. Therefore, I consider this to be a mistake. > > > > >> >> > > >> > > > > >> >> > > >> Additionally, many tasks are currently running in this > > > > >> >> > > >> mode and will > > > > >> >> > > >> continue to run for quite a long time (many users are in > > > > >> >> > > >> this > > > > >> >> > situation). > > > > >> >> > > >> So whether or not it is considered a bug, I believe we > > > > >> >> > > >> need to fix > > > > >> >> it > > > > >> >> > in > > > > >> >> > > >> the 1.x version. > > > > >> >> > > >> > > > > >> >> > > >> Yuepeng Pan <panyuep...@apache.org> 于2024年11月6日周三 14:32写道: > > > > >> >> > > >> > > > > >> >> > > >> > Hi, community. > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > When working on ticket[1] we have received some lively > > > > >> >> > > >> > discussions > > > > >> >> > and > > > > >> >> > > >> > valuable > > > > >> >> > > >> > feedback[2](thanks for Matthias, Rui, Gyula, Maximilian, > > > > >> >> > > >> > Tison, > > > > >> >> > etc.), > > > > >> >> > > >> the > > > > >> >> > > >> > main issues are that: > > > > >> >> > > >> > > > > > >> >> > > >> > When the job runs in an application cluster, could the > > > > >> >> > > >> > default > > > > >> >> > behavior > > > > >> >> > > >> of > > > > >> >> > > >> > AdaptiveScheduler not actively releasing Taskmanagers > > > > >> >> > > >> > resources > > > > >> >> > during > > > > >> >> > > >> > downscaling be considered a bug? > > > > >> >> > > >> > > > > > >> >> > > >> > If so,should we fix it in flink 1.x? > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > I’d like to start a discussion to hear more comments > > > > >> >> > > >> > about it to > > > > >> >> > define > > > > >> >> > > >> > the next step and I have sorted out some information in > > > > >> >> > > >> > the doc[3] > > > > >> >> > > >> > regarding this discussion for you. > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > Looking forward to your comments and attention. > > > > >> >> > > >> > > > > > >> >> > > >> > Thank you. > > > > >> >> > > >> > > > > > >> >> > > >> > Best, > > > > >> >> > > >> > Yuepeng Pan > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > [1] https://issues.apache.org/jira/browse/FLINK-33977 > > > > >> >> > > >> > > > > > >> >> > > >> > [2] > > > > >> >> > https://github.com/apache/flink/pull/25218#issuecomment-2401913141 > > > > >> >> > > >> > > > > > >> >> > > >> > [3] > > > > >> >> > > >> > > > > > >> >> > > >> > > > > >> >> > > > > > >> >> https://docs.google.com/document/d/1Rwwl2aGVz9g5kUJFMP5GMlJwzEO_a-eo4gPf7gITpdw/edit?tab=t.0#heading=h.s4i4hehbbli5 > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > >> >> > > >> -- > > > > >> >> > > >> Best, > > > > >> >> > > >> Yuanfeng > > > > >> >> > > >> > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >