Thanks for creating the implementation plan Xintong. Overall, the
implementation plan looks good. I had a couple of comments:

- What will happen if a user has defined a streaming job with two slot
sharing groups? Would the code insert a blocking data exchange between
these two groups? If yes, then this breaks existing Flink streaming jobs.
- How do we detect unbounded streaming jobs to set
the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be easier to set
it false if we are using the DataSet API or the Blink planner with a
bounded job?

Cheers,
Till

On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann <trohrm...@apache.org> wrote:

> I guess there is a typo since the link to the FLIP-53 is
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
>
> Cheers,
> Till
>
> On Tue, Aug 27, 2019 at 1:42 PM Xintong Song <tonysong...@gmail.com>
> wrote:
>
>> Added implementation steps for this FLIP on the wiki page [1].
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
>>
>> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song <tonysong...@gmail.com>
>> wrote:
>>
>> > Hi everyone,
>> >
>> > As Till suggested, the original "FLIP-53: Fine Grained Resource
>> > Management" splits into two separate FLIPs,
>> >
>> >    - FLIP-53: Fine Grained Operator Resource Management [1]
>> >    - FLIP-56: Dynamic Slot Allocation [2]
>> >
>> > We'll continue using this discussion thread for FLIP-53. For FLIP-56, I
>> > just started a new discussion thread [3].
>> >
>> > Thank you~
>> >
>> > Xintong Song
>> >
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
>> >
>> > [2]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
>> >
>> > [3]
>> >
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html
>> >
>> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song <tonysong...@gmail.com>
>> > wrote:
>> >
>> >> Thinks for the comments, Yang.
>> >>
>> >> Regarding your questions:
>> >>
>> >>    1. How to calculate the resource specification of TaskManagers? Do
>> they
>> >>>    have them same resource spec calculated based on the
>> configuration? I
>> >>> think
>> >>>    we still have wasted resources in this situation. Or we could start
>> >>>    TaskManagers with different spec.
>> >>>
>> >> I agree with you that we can further improve the resource utility by
>> >> customizing task executors with different resource specifications.
>> However,
>> >> I'm in favor of limiting the scope of this FLIP and leave it as a
>> future
>> >> optimization. The plan for that part is to move the logic of deciding
>> task
>> >> executor specifications into the slot manager and make slot manager
>> >> pluggable, so inside the slot manager plugin we can have different
>> logics
>> >> for deciding the task executor specifications.
>> >>
>> >>
>> >>>    2. If a slot is released and returned to SlotPool, does it could be
>> >>>    reused by other SlotRequest that the request resource is smaller
>> than
>> >>> it?
>> >>>
>> >> No, I think slot pool should always return slots if they do not exactly
>> >> match the pending requests, so that resource manager can deal with the
>> >> extra resources.
>> >>
>> >>>       - If it is yes, what happens to the available resource in the
>> >>
>> >>       TaskManager.
>> >>>       - What is the SlotStatus of the cached slot in SlotPool? The
>> >>>       AllocationId is null?
>> >>>
>> >> The allocation id does not change as long as the slot is not returned
>> >> from the job master, no matter its occupied or available in the slot
>> pool.
>> >> I think we have the same behavior currently. No matter how many tasks
>> the
>> >> job master deploy into the slot, concurrently or sequentially, it is
>> one
>> >> allocation from the cluster to the job until the slot is freed from
>> the job
>> >> master.
>> >>
>> >>>    3. In a session cluster, some jobs are configured with operator
>> >>>    resources, meanwhile other jobs are using UNKNOWN. How to deal with
>> >>> this
>> >>>    situation?
>> >>
>> >> As long as we do not mix unknown / specified resource profiles within
>> the
>> >> same job / slot, there shouldn't be a problem. Resource manager
>> converts
>> >> unknown resource profiles in slot requests to specified default
>> resource
>> >> profiles, so they can be dynamically allocated from task executors'
>> >> available resources just as other slot requests with specified resource
>> >> profiles.
>> >>
>> >> Thank you~
>> >>
>> >> Xintong Song
>> >>
>> >>
>> >>
>> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang <danrtsey...@gmail.com>
>> wrote:
>> >>
>> >>> Hi Xintong,
>> >>>
>> >>>
>> >>> Thanks for your detailed proposal. I think many users are suffering
>> from
>> >>> waste of resources. The resource spec of all task managers are same
>> and
>> >>> we
>> >>> have to increase all task managers to make the heavy one more stable.
>> So
>> >>> we
>> >>> will benefit from the fine grained resource management a lot. We could
>> >>> get
>> >>> better resource utilization and stability.
>> >>>
>> >>>
>> >>> Just to share some thoughts.
>> >>>
>> >>>
>> >>>
>> >>>    1. How to calculate the resource specification of TaskManagers? Do
>> >>> they
>> >>>    have them same resource spec calculated based on the
>> configuration? I
>> >>> think
>> >>>    we still have wasted resources in this situation. Or we could start
>> >>>    TaskManagers with different spec.
>> >>>    2. If a slot is released and returned to SlotPool, does it could be
>> >>>    reused by other SlotRequest that the request resource is smaller
>> than
>> >>> it?
>> >>>       - If it is yes, what happens to the available resource in the
>> >>>       TaskManager.
>> >>>       - What is the SlotStatus of the cached slot in SlotPool? The
>> >>>       AllocationId is null?
>> >>>    3. In a session cluster, some jobs are configured with operator
>> >>>    resources, meanwhile other jobs are using UNKNOWN. How to deal with
>> >>> this
>> >>>    situation?
>> >>>
>> >>>
>> >>>
>> >>> Best,
>> >>> Yang
>> >>>
>> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五 下午8:57写道:
>> >>>
>> >>> > Thanks for the feedbacks, Yangze and Till.
>> >>> >
>> >>> > Yangze,
>> >>> >
>> >>> > I agree with you that we should make scheduling strategy pluggable
>> and
>> >>> > optimize the strategy to reduce the memory fragmentation problem,
>> and
>> >>> > thanks for the inputs on the potential algorithmic solutions.
>> However,
>> >>> I'm
>> >>> > in favor of keep this FLIP focusing on the overall mechanism design
>> >>> rather
>> >>> > than strategies. Solving the fragmentation issue should be
>> considered
>> >>> as an
>> >>> > optimization, and I agree with Till that we probably should tackle
>> this
>> >>> > afterwards.
>> >>> >
>> >>> > Till,
>> >>> >
>> >>> > - Regarding splitting the FLIP, I think it makes sense. The operator
>> >>> > resource management and dynamic slot allocation do not have much
>> >>> dependency
>> >>> > on each other.
>> >>> >
>> >>> > - Regarding the default slot size, I think this is similar to
>> FLIP-49
>> >>> [1]
>> >>> > where we want all the deriving happens at one place. I think it
>> would
>> >>> be
>> >>> > nice to pass the default slot size into the task executor in the
>> same
>> >>> way
>> >>> > that we pass in the memory pool sizes in FLIP-49 [1].
>> >>> >
>> >>> > - Regarding the return value of
>> TaskExecutorGateway#requestResource, I
>> >>> > think you're right. We should avoid using null as the return value.
>> I
>> >>> think
>> >>> > we probably should thrown an exception here.
>> >>> >
>> >>> > Thank you~
>> >>> >
>> >>> > Xintong Song
>> >>> >
>> >>> >
>> >>> > [1]
>> >>> >
>> >>> >
>> >>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
>> >>> >
>> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann <trohrm...@apache.org
>> >
>> >>> > wrote:
>> >>> >
>> >>> > > Hi Xintong,
>> >>> > >
>> >>> > > thanks for drafting this FLIP. I think your proposal helps to
>> >>> improve the
>> >>> > > execution of batch jobs more efficiently. Moreover, it enables the
>> >>> proper
>> >>> > > integration of the Blink planner which is very important as well.
>> >>> > >
>> >>> > > Overall, the FLIP looks good to me. I was wondering whether it
>> >>> wouldn't
>> >>> > > make sense to actually split it up into two FLIPs: Operator
>> resource
>> >>> > > management and dynamic slot allocation. I think these two FLIPs
>> >>> could be
>> >>> > > seen as orthogonal and it would decrease the scope of each
>> individual
>> >>> > FLIP.
>> >>> > >
>> >>> > > Some smaller comments:
>> >>> > >
>> >>> > > - I'm not sure whether we should pass in the default slot size
>> via an
>> >>> > > environment variable. Without having unified the way how Flink
>> >>> components
>> >>> > > are configured [1], I think it would be better to pass it in as
>> part
>> >>> of
>> >>> > the
>> >>> > > configuration.
>> >>> > > - I would avoid returning a null value from
>> >>> > > TaskExecutorGateway#requestResource if it cannot be fulfilled.
>> >>> Either we
>> >>> > > should introduce an explicit return value saying this or throw an
>> >>> > > exception.
>> >>> > >
>> >>> > > Concerning Yangze's comments: I think you are right that it would
>> be
>> >>> > > helpful to make the selection strategy pluggable. Also batching
>> slot
>> >>> > > requests to the RM could be a good optimization. For the sake of
>> >>> keeping
>> >>> > > the scope of this FLIP smaller I would try to tackle these things
>> >>> after
>> >>> > the
>> >>> > > initial version has been completed (without spoiling these
>> >>> optimization
>> >>> > > opportunities). In particular batching the slot requests depends
>> on
>> >>> the
>> >>> > > current scheduler refactoring and could also be realized on the RM
>> >>> side
>> >>> > > only.
>> >>> > >
>> >>> > > [1]
>> >>> > >
>> >>> > >
>> >>> >
>> >>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>> >>> > >
>> >>> > > Cheers,
>> >>> > > Till
>> >>> > >
>> >>> > >
>> >>> > >
>> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo <karma...@gmail.com>
>> >>> wrote:
>> >>> > >
>> >>> > > > Hi, Xintong
>> >>> > > >
>> >>> > > > Thanks to propose this FLIP. The general design looks good to
>> me,
>> >>> +1
>> >>> > > > for this feature.
>> >>> > > >
>> >>> > > > Since slots in the same task executor could have different
>> resource
>> >>> > > > profile, we will
>> >>> > > > meet resource fragment problem. Think about this case:
>> >>> > > >  - request A want 1G memory while request B & C want 0.5G memory
>> >>> > > >  - There are two task executors T1 & T2 with 1G and 0.5G free
>> >>> memory
>> >>> > > > respectively
>> >>> > > > If B come first and we cut a slot from T1 for B, A must wait for
>> >>> the
>> >>> > > > free resource from
>> >>> > > > other task. But A could have been scheduled immediately if we
>> cut a
>> >>> > > > slot from T2 for B.
>> >>> > > >
>> >>> > > > The logic of findMatchingSlot now become finding a task executor
>> >>> which
>> >>> > > > has enough
>> >>> > > > resource and then cut a slot from it. Current method could be
>> seen
>> >>> as
>> >>> > > > "First-fit strategy",
>> >>> > > > which works well in general but sometimes could not be the
>> >>> optimization
>> >>> > > > method.
>> >>> > > >
>> >>> > > > Actually, this problem could be abstracted as "Bin Packing
>> >>> Problem"[1].
>> >>> > > > Here are
>> >>> > > > some common approximate algorithms:
>> >>> > > > - First fit
>> >>> > > > - Next fit
>> >>> > > > - Best fit
>> >>> > > >
>> >>> > > > But it become multi-dimensional bin packing problem if we take
>> CPU
>> >>> > > > into account. It hard
>> >>> > > > to define which one is best fit now. Some research addressed
>> this
>> >>> > > > problem, such like Tetris[2].
>> >>> > > >
>> >>> > > > Here are some thinking about it:
>> >>> > > > 1. We could make the strategy of finding matching task executor
>> >>> > > > pluginable. Let user to config the
>> >>> > > > best strategy in their scenario.
>> >>> > > > 2. We could support batch request interface in RM, because we
>> have
>> >>> > > > opportunities to optimize
>> >>> > > > if we have more information. If we know the A, B, C at the same
>> >>> time,
>> >>> > > > we could always make the best decision.
>> >>> > > >
>> >>> > > > [1] http://www.or.deis.unibo.it/kp/Chapter8.pdf
>> >>> > > > [2]
>> >>> >
>> https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf
>> >>> > > >
>> >>> > > > Best,
>> >>> > > > Yangze Guo
>> >>> > > >
>> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song <
>> >>> tonysong...@gmail.com>
>> >>> > > > wrote:
>> >>> > > > >
>> >>> > > > > Hi everyone,
>> >>> > > > >
>> >>> > > > > We would like to start a discussion thread on "FLIP-53: Fine
>> >>> Grained
>> >>> > > > > Resource Management"[1], where we propose how to improve Flink
>> >>> > resource
>> >>> > > > > management and scheduling.
>> >>> > > > >
>> >>> > > > > This FLIP mainly discusses the following issues.
>> >>> > > > >
>> >>> > > > >    - How to support tasks with fine grained resource
>> >>> requirements.
>> >>> > > > >    - How to unify resource management for jobs with / without
>> >>> fine
>> >>> > > > grained
>> >>> > > > >    resource requirements.
>> >>> > > > >    - How to unify resource management for streaming / batch
>> jobs.
>> >>> > > > >
>> >>> > > > > Key changes proposed in the FLIP are as follows.
>> >>> > > > >
>> >>> > > > >    - Unify memory management for operators with / without fine
>> >>> > grained
>> >>> > > > >    resource requirements by applying a fraction based quota
>> >>> > mechanism.
>> >>> > > > >    - Unify resource scheduling for streaming and batch jobs by
>> >>> > setting
>> >>> > > > slot
>> >>> > > > >    sharing groups for pipelined regions during compiling
>> stage.
>> >>> > > > >    - Dynamically allocate slots from task executors' available
>> >>> > > resources.
>> >>> > > > >
>> >>> > > > > Please find more details in the FLIP wiki document [1].
>> Looking
>> >>> > forward
>> >>> > > > to
>> >>> > > > > your feedbacks.
>> >>> > > > >
>> >>> > > > > Thank you~
>> >>> > > > >
>> >>> > > > > Xintong Song
>> >>> > > > >
>> >>> > > > >
>> >>> > > > > [1]
>> >>> > > > >
>> >>> > > >
>> >>> > >
>> >>> >
>> >>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management
>> >>> > > >
>> >>> > >
>> >>> >
>> >>>
>> >>
>>
>

Reply via email to