Thanks for the response, Rui and Yuepeng.

>> Rui
> 1. The default value is None, right?
Exactly.

> 2. When it's set to Tasks, how to assign slots to TM?
It's option2 at the moment. However, I think it's just implementation
details and can be changed/refined later.

As you mentioned in another comment, 'taskmanager.load-balance.mode' is
a user oriented configuration. The goal is to achieve load balance, while
the load can be defined as allocated slots or assigned tasks.
The 'Tasks' mode, just the same as what is proposed in the FLIP, currently
use the mechanism of 'cluster.evenly-spread-out-slots' to help to achieve
balanced number of tasks. It's not perfect, but has acceptable effectiveness
and lower implementation complexity.

The 'Slots' mode is needed for compatible reasons. Users that are satisfied
with the current ability of 'cluster.evenly-spread-out-slots' can continue
using it after the config 'cluster.evenly-spread-out-slots' is deprecated.


>> Yuepeng
I think what users want is load balance. The combination is implementation
details and should be transparent to users.

Meanwhile, I think locality does not entirely conflict with load balance.
In fact,
they should be both considered when assigning tasks. Usually, state locality
should have the highest priority, and input locality can also be taken care
of when trying to balance tasks to slots and TMs. We can see that the most
important input locality, i.e. forward, is always covered in this FLIP when
computing slot sharing groups. It can be further optimized if we find it
problematic.

Thanks,
Zhu

Yangze Guo <karma...@gmail.com> 于2023年10月8日周日 13:53写道:

> Thanks for the updates, Rui.
>
> It does seem challenging to ensure evenness in slot deployment unless
> we introduce batch slot requests in SlotPool. However, one possibility
> is to add a delay of around 50ms during the SlotPool's resource
> requirement declaration to the ResourceManager, similar to the
> checkResourceRequirementsWithDelay in the SlotManager. In most cases,
> this delay would allow the SlotManager to see all resource
> requirements, then it can allocate the slot more evenly. As a side
> effect, it could also significantly reduce the number of RPC messages
> to the ResourceManager, which could become a single-point bottleneck
> in OLAP scenarios. WDYT?
>
> Best,
> Yangze Guo
>
> On Sat, Oct 7, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > Hi Yangze,
> >
> > Thanks for your quick response!
> >
> > Sorry, I re-read the 2.2.2 part[1] about the Waiting Mechanism, I found
> > it isn't clear. The root cause of introducing the waiting mechanism is
> > that the slot requests are sent from JobMaster to SlotPool is
> > one by one instead of one whole batch. I have rewritten the 2.2.2 part,
> > please read it again in your free time.
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism
> >
> > Best,
> > Rui
> >
> > On Sat, Oct 7, 2023 at 4:34 PM Yangze Guo <karma...@gmail.com> wrote:
> >>
> >> Thanks for the clarification, Rui.
> >>
> >> I believe the root cause of this issue is that in the current
> >> DefaultResourceAllocationStrategy, slot allocation begins before the
> >> decision to PendingTaskManagers requesting is made. That can be fixed
> >> within the strategy without introducing another waiting mechanism. I
> >> think it would be better to address this issue within the scope of
> >> this FLIP. However, I don't have a strong opinion on it, it depends on
> >> your bandwidth.
> >>
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Sat, Oct 7, 2023 at 4:16 PM Rui Fan <1996fan...@gmail.com> wrote:
> >> >
> >> > Hi Yangze,
> >> >
> >> > > 2. From my understanding, if user enable the
> >> > > cluster.evenly-spread-out-slots,
> >> > > LeastUtilizationResourceMatchingStrategy will be used to determine
> the
> >> > > slot distribution and the slot allocation in the three TM will be
> >> > > (taskmanager.numberOfTaskSlots=3):
> >> > > TM1: 3 slot
> >> > > TM2: 2 slot
> >> > > TM3: 2 slot
> >> >
> >> > When all tms are ready in advance, the three TM will be:
> >> > TM1: 3 slot
> >> > TM2: 2 slot
> >> > TM3: 2 slot
> >> >
> >> > For application mode, the resource manager doesn't apply for
> >> > TM in advance, and slots aren't enough before the third TM is ready.
> >> > So all slots of the second TM will be used up. The three TM will be:
> >> > TM1: 3 slot
> >> > TM2: 3 slot
> >> > TM3: 1 slot
> >> >
> >> > That's why the FLIP add some notes:
> >> >
> >> > All free slots are in the last TM, because ResourceManager doesn’t
> have the waiting mechanism, and it just requests 7 slots for this JobMaster.
> >> > Why is it acceptable?
> >> >
> >> > If we just add the waiting mechanism to JobMaster but not in
> ResourceManager, all free slots will be in the last TM. All slots of other
> TMs are offered to JM.
> >> > That is, only one TM may have fewer tasks than the other TMs. The
> difference between the number of tasks of other TMs is at most 1.So When p
> >> slotsPerTM, the problem can be ignored.
> >> > We can also suggest users, in cases that p is small, it's better to
> configure slotsPerTM to 1, or let p % slotsPerTM == 0.
> >> >
> >> > Please correct me if my understanding is wrong, thanks~
> >> >
> >> > Best,
> >> > Rui
> >> >
> >> > On Sun, Oct 1, 2023 at 7:38 PM Yangze Guo <karma...@gmail.com> wrote:
> >> >>
> >> >> Hi, Rui,
> >> >>
> >> >> 1. With the current mechanism, when physical slots are offered from
> >> >> TM, the JobMaster will start deploying tasks and synchronizing their
> >> >> states. With the addition of the waiting mechanism, IIUC, the
> >> >> JobMaster will deploy and synchronize the states of all tasks only
> >> >> after all resources are available. The task deployment and state
> >> >> synchronization both occupy the JobMaster's RPC main thread. In
> >> >> complex jobs with a lot of tasks, this waiting mechanism may increase
> >> >> the pressure on the JobMaster and increase the end-to-end job
> >> >> deployment time.
> >> >>
> >> >> 2. From my understanding, if user enable the
> >> >> cluster.evenly-spread-out-slots,
> >> >> LeastUtilizationResourceMatchingStrategy will be used to determine
> the
> >> >> slot distribution and the slot allocation in the three TM will be
> >> >> (taskmanager.numberOfTaskSlots=3):
> >> >> TM1: 3 slot
> >> >> TM2: 2 slot
> >> >> TM3: 2 slot
> >> >>
> >> >> Best,
> >> >> Yangze Guo
> >> >>
> >> >> On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> wrote:
> >> >> >
> >> >> > Hi Shammon,
> >> >> >
> >> >> > Thanks for your feedback as well!
> >> >> >
> >> >> > > IIUC, the overall balance is divided into two parts: slot to TM
> and task
> >> >> > to slot.
> >> >> > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager
> >> >> > > 2. Task to slot is guaranteed by the slot pool in JM
> >> >> > >
> >> >> > > These two are completely independent, what are the benefits of
> unifying
> >> >> > > these two into one option? Also, do we want to share the same
> >> >> > > option between SlotPool in JM and SlotManager in RM? This sounds
> a bit
> >> >> > > strange.
> >> >> >
> >> >> > Your understanding is totally right, the balance needs 2 parts:
> slot to TM
> >> >> > and task to slot.
> >> >> >
> >> >> > As I understand, the following are benefits of unifying them into
> one
> >> >> > option:
> >> >> >
> >> >> > - Flink users don't care about these principles inside of flink,
> they don't
> >> >> > know these 2 parts.
> >> >> > - If flink provides 2 options, flink users need to set 2 options
> for their
> >> >> > job.
> >> >> > - If one option is missed, the final result may not be good.
> (Users may
> >> >> > have questions when using)
> >> >> > - If flink just provides 1 option, enabling one option is enough.
> (Reduce
> >> >> > the probability of misconfiguration)
> >> >> >
> >> >> > Also, Flink’s options are user-oriented. Each option represents a
> switch or
> >> >> > parameter of a feature.
> >> >> > A feature may be composed of multiple components inside Flink.
> >> >> > It might be better to keep only one switch per feature.
> >> >> >
> >> >> > Actually, the cluster.evenly-spread-out-slots option is used
> between
> >> >> > SlotPool in JM and SlotManager in RM. 2 components to ensure
> >> >> > this feature works well.
> >> >> >
> >> >> > Please correct me if my understanding is wrong,
> >> >> > and looking forward to your feedback, thanks!
> >> >> >
> >> >> > Best,
> >> >> > Rui
> >> >> >
> >> >> > On Sun, Oct 1, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com>
> wrote:
> >> >> >
> >> >> > > Hi Yangze,
> >> >> > >
> >> >> > > Thanks for your feedback!
> >> >> > >
> >> >> > > > 1. Is it possible for the SlotPool to get the slot allocation
> results
> >> >> > > > from the SlotManager in advance instead of waiting for the
> actual
> >> >> > > > physical slots to be registered, and perform pre-allocation?
> The
> >> >> > > > benefit of doing this is to make the task deployment process
> smoother,
> >> >> > > > especially when there are a large number of tasks in the job.
> >> >> > >
> >> >> > > Could you elaborate on that? I didn't understand what's the
> benefit and
> >> >> > > smoother.
> >> >> > >
> >> >> > > > 2. If user enable the cluster.evenly-spread-out-slots, the
> issue in
> >> >> > > > example 2 of section 2.2.3 can be resolved. Do I understand it
> >> >> > > > correctly?
> >> >> > >
> >> >> > > The example assigned result is the final allocation result when
> flink
> >> >> > > user enables the cluster.evenly-spread-out-slots. We think the
> >> >> > > assigned result is expected, so I think your understanding is
> right.
> >> >> > >
> >> >> > > Best,
> >> >> > > Rui
> >> >> > >
> >> >> > > On Thu, Sep 28, 2023 at 1:10 PM Shammon FY <zjur...@gmail.com>
> wrote:
> >> >> > >
> >> >> > >> Thanks Yuepeng for initiating this discussion.
> >> >> > >>
> >> >> > >> +1 in general too, in fact we have implemented a similar
> mechanism
> >> >> > >> internally to ensure a balanced allocation of tasks to slots,
> it works
> >> >> > >> well.
> >> >> > >>
> >> >> > >> Some comments about the mechanism
> >> >> > >>
> >> >> > >> 1. This mechanism will be only supported in `SlotPool` or both
> `SlotPool`
> >> >> > >> and `DeclarativeSlotPool`? Currently the two slot pools are
> used in
> >> >> > >> different schedulers. I think this will also bring value to
> >> >> > >> `DeclarativeSlotPool`, but currently FLIP content seems to be
> based on
> >> >> > >> `SlotPool`, right?
> >> >> > >>
> >> >> > >> 2. In fine-grained resource management, we can set different
> resource
> >> >> > >> requirements for different nodes, which means that the
> resources of each
> >> >> > >> slot are different. What should be done when the slot selected
> by the
> >> >> > >> round-robin strategy cannot meet the resource requirements?
> Will this lead
> >> >> > >> to the failure of the balance strategy?
> >> >> > >>
> >> >> > >> 3. Is the assignment of tasks to slots balanced based on region
> or job
> >> >> > >> level? When multiple TMs fail over, will it cause the balancing
> strategy
> >> >> > >> to
> >> >> > >> fail or even worse? What is the current processing strategy?
> >> >> > >>
> >> >> > >> For Zhuzhu and Rui:
> >> >> > >>
> >> >> > >> IIUC, the overall balance is divided into two parts: slot to TM
> and task
> >> >> > >> to
> >> >> > >> slot.
> >> >> > >> 1. Slot to TM is guaranteed by SlotManager in ResourceManager
> >> >> > >> 2. Task to slot is guaranteed by the slot pool in JM
> >> >> > >>
> >> >> > >> These two are completely independent, what are the benefits of
> unifying
> >> >> > >> these two into one option? Also, do we want to share the same
> >> >> > >> option between SlotPool in JM and SlotManager in RM? This
> sounds a bit
> >> >> > >> strange.
> >> >> > >>
> >> >> > >> Best,
> >> >> > >> Shammon FY
> >> >> > >>
> >> >> > >>
> >> >> > >>
> >> >> > >> On Thu, Sep 28, 2023 at 12:08 PM Rui Fan <1996fan...@gmail.com>
> wrote:
> >> >> > >>
> >> >> > >> > Hi Zhu Zhu,
> >> >> > >> >
> >> >> > >> > Thanks for your feedback here!
> >> >> > >> >
> >> >> > >> > You are right, user needs to set 2 options:
> >> >> > >> > - cluster.evenly-spread-out-slots=true
> >> >> > >> > - slot.sharing-strategy=TASK_BALANCED_PREFERRED
> >> >> > >> >
> >> >> > >> > Update it to one option is useful at user side, so
> >> >> > >> > `taskmanager.load-balance.mode` sounds good to me.
> >> >> > >> > I want to check some points and behaviors about this option:
> >> >> > >> >
> >> >> > >> > 1. The default value is None, right?
> >> >> > >> > 2. When it's set to Tasks, how to assign slots to TM?
> >> >> > >> > - Option1: It's just check task number
> >> >> > >> > - Option2: It''s check the slot number first, then check the
> >> >> > >> > task number when the slot number is the same.
> >> >> > >> >
> >> >> > >> > Giving an example to explain what's the difference between
> them:
> >> >> > >> >
> >> >> > >> > - A session cluster has 2 flink jobs, they are jobA and jobB
> >> >> > >> > - Each TM has 4 slots.
> >> >> > >> > - The task number of one slot of jobA is 3
> >> >> > >> > - The task number of one slot of jobB is 1
> >> >> > >> > - We have 2 TaskManagers:
> >> >> > >> >   - tm1 runs 3 slots of jobB, so tm1 runs 3 tasks
> >> >> > >> >   - tm2 runs 1 slot of jobA, and 1 slot of jobB, so tm2 runs
> 4 tasks.
> >> >> > >> >
> >> >> > >> > Now, we need to run a new slot, which tm should offer it?
> >> >> > >> > - Option1: If we just check the task number, the tm1 is
> better.
> >> >> > >> > - Option2: If we check the slot number first, and then check
> task, the
> >> >> > >> tm2
> >> >> > >> > is better
> >> >> > >> >
> >> >> > >> > The original FLIP selected option2, that's why we didn't add
> the
> >> >> > >> > third option. The option2 didn't break the semantics when
> >> >> > >> > `cluster.evenly-spread-out-slots` is true, and it just
> improve the
> >> >> > >> > behavior without the semantics is changed.
> >> >> > >> >
> >> >> > >> > In the other hands, if we choose option2, when user set
> >> >> > >> > `taskmanager.load-balance.mode` is Tasks. It also can achieve
> >> >> > >> > the goal when it's Slots.
> >> >> > >> >
> >> >> > >> > So I think the `Slots` enum isn't needed if we choose option2.
> >> >> > >> > Of course, If we choose the option1, the enum is needed.
> >> >> > >> >
> >> >> > >> > Looking forward to your feedback, thanks~
> >> >> > >> >
> >> >> > >> > Best,
> >> >> > >> > Rui
> >> >> > >> >
> >> >> > >> > On Wed, Sep 27, 2023 at 9:11 PM Zhu Zhu <reed...@gmail.com>
> wrote:
> >> >> > >> >
> >> >> > >> > > Thanks Yuepeng and Rui for creating this FLIP.
> >> >> > >> > >
> >> >> > >> > > +1 in general
> >> >> > >> > > The idea is straight forward: best-effort gather all the
> slot requests
> >> >> > >> > > and offered slots to form an overview before assigning
> slots, trying
> >> >> > >> to
> >> >> > >> > > balance the loads of task managers when assigning slots.
> >> >> > >> > >
> >> >> > >> > > I have one comment regarding the configuration for ease of
> use:
> >> >> > >> > >
> >> >> > >> > > IIUC, this FLIP uses an existing config
> >> >> > >> 'cluster.evenly-spread-out-slots'
> >> >> > >> > > as the main switch of the new feature. That is, from user
> perspective,
> >> >> > >> > > with this improvement, the
> 'cluster.evenly-spread-out-slots' feature
> >> >> > >> not
> >> >> > >> > > only balances the number of slots on task managers, but
> also balances
> >> >> > >> the
> >> >> > >> > > number of tasks. This is a behavior change anyway. Besides
> that, it
> >> >> > >> also
> >> >> > >> > > requires users to set 'slot.sharing-strategy' to
> >> >> > >> > 'TASK_BALANCED_PREFERRED'
> >> >> > >> > > to balance the tasks in each slot.
> >> >> > >> > >
> >> >> > >> > > I think we can introduce a new config option
> >> >> > >> > > `taskmanager.load-balance.mode`,
> >> >> > >> > > which accepts "None"/"Slots"/"Tasks".
> >> >> > >> `cluster.evenly-spread-out-slots`
> >> >> > >> > > can be superseded by the "Slots" mode and get deprecated.
> In the
> >> >> > >> future
> >> >> > >> > > it can support more mode, e.g. "CpuCores", to work better
> for jobs
> >> >> > >> with
> >> >> > >> > > fine-grained resources. The proposed config option
> >> >> > >> > > `slot.request.max-interval`
> >> >> > >> > > then can be renamed to
> >> >> > >> > > `taskmanager.load-balance.request-stablizing-timeout`
> >> >> > >> > > to show its relation with the feature. The proposed
> >> >> > >> > `slot.sharing-strategy`
> >> >> > >> > > is not needed, because the configured "Tasks" mode will do
> the work.
> >> >> > >> > >
> >> >> > >> > > WDYT?
> >> >> > >> > >
> >> >> > >> > > Thanks,
> >> >> > >> > > Zhu Zhu
> >> >> > >> > >
> >> >> > >> > > Yuepeng Pan <panyuep...@apache.org> 于2023年9月25日周一 16:26写道:
> >> >> > >> > >
> >> >> > >> > >> Hi all,
> >> >> > >> > >>
> >> >> > >> > >>
> >> >> > >> > >> I and Fan Rui(CC’ed) created the FLIP-370[1] to support
> balanced
> >> >> > >> tasks
> >> >> > >> > >> scheduling.
> >> >> > >> > >>
> >> >> > >> > >>
> >> >> > >> > >> The current strategy of Flink to deploy tasks sometimes
> leads some
> >> >> > >> > >> TMs(TaskManagers) to have more tasks while others have
> fewer tasks,
> >> >> > >> > >> resulting in excessive resource utilization at some TMs
> that contain
> >> >> > >> > more
> >> >> > >> > >> tasks and becoming a bottleneck for the entire job
> processing.
> >> >> > >> > Developing
> >> >> > >> > >> strategies to achieve task load balancing for TMs and
> reducing job
> >> >> > >> > >> bottlenecks becomes very meaningful.
> >> >> > >> > >>
> >> >> > >> > >>
> >> >> > >> > >> The raw design and discussions could be found in the Flink
> JIRA[2]
> >> >> > >> and
> >> >> > >> > >> Google doc[3]. We really appreciate Zhu Zhu(CC’ed) for
> providing some
> >> >> > >> > >> valuable help and suggestions in advance.
> >> >> > >> > >>
> >> >> > >> > >>
> >> >> > >> > >> Please refer to the FLIP[1] document for more details
> about the
> >> >> > >> proposed
> >> >> > >> > >> design and implementation. We welcome any feedback and
> opinions on
> >> >> > >> this
> >> >> > >> > >> proposal.
> >> >> > >> > >>
> >> >> > >> > >>
> >> >> > >> > >> [1]
> >> >> > >> > >>
> >> >> > >> >
> >> >> > >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling
> >> >> > >> > >>
> >> >> > >> > >> [2] https://issues.apache.org/jira/browse/FLINK-31757
> >> >> > >> > >>
> >> >> > >> > >> [3]
> >> >> > >> > >>
> >> >> > >> >
> >> >> > >>
> https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8
> >> >> > >> > >>
> >> >> > >> > >>
> >> >> > >> > >> Best,
> >> >> > >> > >>
> >> >> > >> > >> Yuepeng Pan
> >> >> > >> > >>
> >> >> > >> > >
> >> >> > >> >
> >> >> > >>
> >> >> > >
>

Reply via email to