Thanks Xintong for proposing this improvement. Fine grained resources can
be very helpful when user has good planning on resources.

I have a few questions:
1. Currently in a batch job, vertices from different regions can run at the
same time in slots from the same shared group, as long as they do not have
data dependency on each other and available slot count is not smaller than
the *max* of parallelism of all tasks.
With changes in this FLIP however, tasks from different regions cannot
share slots anymore.
Once available slot count is smaller than the *sum* of all parallelism of
tasks from all regions, tasks may need to be executed sequentially, which
might result in a performance regression.
Is this(performance regression to existing DataSet jobs) considered as a
necessary and accepted trade off in this FLIP?

2. The network memory depends on the input/output ExecutionEdge count and
thus can be different even for parallel instances of the same JobVertex.
Does this mean that when adding task resources to calculating the slot
resource for a shared group, the max possible network memory of the vertex
instance shall be used?
This might result in larger resource required than actually needed.

And some minor comments:
1. Regarding "fracManagedMemOnHeap = 1 / numOpsUseOnHeapManagedMemory", I
guess you mean numOpsUseOnHeapManagedMemoryInTheSameSharedGroup ?
2. I think the *StreamGraphGenerator* in the #Slot Sharing section and
implementation step 4 should be *StreamingJobGraphGenerator*, as
*StreamGraphGenerator* is not aware of JobGraph and pipelined region.


Thanks,
Zhu Zhu

Xintong Song <tonysong...@gmail.com> 于2019年9月2日周一 上午11:59写道:

> Updated the FLIP wiki page [1], with the following changes.
>
>    - Remove the step of converting pipelined edges between different slot
>    sharing groups into blocking edges.
>    - Set `allSourcesInSamePipelinedRegion` to true by default.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Sep 2, 2019 at 11:50 AM Xintong Song <tonysong...@gmail.com>
> wrote:
>
> > Regarding changing edge type, I think actually we don't need to do this
> > for batch jobs neither, because we don't have public interfaces for users
> > to explicitly set slot sharing groups in DataSet API and SQL/Table API.
> We
> > have such interfaces in DataStream API only.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Aug 27, 2019 at 10:16 PM Xintong Song <tonysong...@gmail.com>
> > wrote:
> >
> >> Thanks for the correction, Till.
> >>
> >> Regarding your comments:
> >> - You are right, we should not change the edge type for streaming jobs.
> >> Then I think we can change the option 'allSourcesInSamePipelinedRegion'
> in
> >> step 2 to 'isStreamingJob', and implement the current step 2 before the
> >> current step 1 so we can use this option to decide whether should change
> >> the edge type. What do you think?
> >> - Agree. It should be easier to make the default value of
> >> 'allSourcesInSamePipelinedRegion' (or 'isStreamingJob') 'true', and set
> it
> >> to 'false' when using DataSet API or blink planner.
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann <trohrm...@apache.org>
> >> wrote:
> >>
> >>> Thanks for creating the implementation plan Xintong. Overall, the
> >>> implementation plan looks good. I had a couple of comments:
> >>>
> >>> - What will happen if a user has defined a streaming job with two slot
> >>> sharing groups? Would the code insert a blocking data exchange between
> >>> these two groups? If yes, then this breaks existing Flink streaming
> jobs.
> >>> - How do we detect unbounded streaming jobs to set
> >>> the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be easier to
> >>> set
> >>> it false if we are using the DataSet API or the Blink planner with a
> >>> bounded job?
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann <trohrm...@apache.org>
> >>> wrote:
> >>>
> >>> > I guess there is a typo since the link to the FLIP-53 is
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
> >>> >
> >>> > Cheers,
> >>> > Till
> >>> >
> >>> > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song <tonysong...@gmail.com>
> >>> > wrote:
> >>> >
> >>> >> Added implementation steps for this FLIP on the wiki page [1].
> >>> >>
> >>> >>
> >>> >> Thank you~
> >>> >>
> >>> >> Xintong Song
> >>> >>
> >>> >>
> >>> >> [1]
> >>> >>
> >>> >>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> >>> >>
> >>> >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song <
> tonysong...@gmail.com>
> >>> >> wrote:
> >>> >>
> >>> >> > Hi everyone,
> >>> >> >
> >>> >> > As Till suggested, the original "FLIP-53: Fine Grained Resource
> >>> >> > Management" splits into two separate FLIPs,
> >>> >> >
> >>> >> >    - FLIP-53: Fine Grained Operator Resource Management [1]
> >>> >> >    - FLIP-56: Dynamic Slot Allocation [2]
> >>> >> >
> >>> >> > We'll continue using this discussion thread for FLIP-53. For
> >>> FLIP-56, I
> >>> >> > just started a new discussion thread [3].
> >>> >> >
> >>> >> > Thank you~
> >>> >> >
> >>> >> > Xintong Song
> >>> >> >
> >>> >> >
> >>> >> > [1]
> >>> >> >
> >>> >>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
> >>> >> >
> >>> >> > [2]
> >>> >> >
> >>> >>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> >>> >> >
> >>> >> > [3]
> >>> >> >
> >>> >>
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html
> >>> >> >
> >>> >> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song <
> tonysong...@gmail.com
> >>> >
> >>> >> > wrote:
> >>> >> >
> >>> >> >> Thinks for the comments, Yang.
> >>> >> >>
> >>> >> >> Regarding your questions:
> >>> >> >>
> >>> >> >>    1. How to calculate the resource specification of
> TaskManagers?
> >>> Do
> >>> >> they
> >>> >> >>>    have them same resource spec calculated based on the
> >>> >> configuration? I
> >>> >> >>> think
> >>> >> >>>    we still have wasted resources in this situation. Or we could
> >>> start
> >>> >> >>>    TaskManagers with different spec.
> >>> >> >>>
> >>> >> >> I agree with you that we can further improve the resource utility
> >>> by
> >>> >> >> customizing task executors with different resource
> specifications.
> >>> >> However,
> >>> >> >> I'm in favor of limiting the scope of this FLIP and leave it as a
> >>> >> future
> >>> >> >> optimization. The plan for that part is to move the logic of
> >>> deciding
> >>> >> task
> >>> >> >> executor specifications into the slot manager and make slot
> manager
> >>> >> >> pluggable, so inside the slot manager plugin we can have
> different
> >>> >> logics
> >>> >> >> for deciding the task executor specifications.
> >>> >> >>
> >>> >> >>
> >>> >> >>>    2. If a slot is released and returned to SlotPool, does it
> >>> could be
> >>> >> >>>    reused by other SlotRequest that the request resource is
> >>> smaller
> >>> >> than
> >>> >> >>> it?
> >>> >> >>>
> >>> >> >> No, I think slot pool should always return slots if they do not
> >>> exactly
> >>> >> >> match the pending requests, so that resource manager can deal
> with
> >>> the
> >>> >> >> extra resources.
> >>> >> >>
> >>> >> >>>       - If it is yes, what happens to the available resource in
> >>> the
> >>> >> >>
> >>> >> >>       TaskManager.
> >>> >> >>>       - What is the SlotStatus of the cached slot in SlotPool?
> The
> >>> >> >>>       AllocationId is null?
> >>> >> >>>
> >>> >> >> The allocation id does not change as long as the slot is not
> >>> returned
> >>> >> >> from the job master, no matter its occupied or available in the
> >>> slot
> >>> >> pool.
> >>> >> >> I think we have the same behavior currently. No matter how many
> >>> tasks
> >>> >> the
> >>> >> >> job master deploy into the slot, concurrently or sequentially, it
> >>> is
> >>> >> one
> >>> >> >> allocation from the cluster to the job until the slot is freed
> from
> >>> >> the job
> >>> >> >> master.
> >>> >> >>
> >>> >> >>>    3. In a session cluster, some jobs are configured with
> operator
> >>> >> >>>    resources, meanwhile other jobs are using UNKNOWN. How to
> deal
> >>> with
> >>> >> >>> this
> >>> >> >>>    situation?
> >>> >> >>
> >>> >> >> As long as we do not mix unknown / specified resource profiles
> >>> within
> >>> >> the
> >>> >> >> same job / slot, there shouldn't be a problem. Resource manager
> >>> >> converts
> >>> >> >> unknown resource profiles in slot requests to specified default
> >>> >> resource
> >>> >> >> profiles, so they can be dynamically allocated from task
> executors'
> >>> >> >> available resources just as other slot requests with specified
> >>> resource
> >>> >> >> profiles.
> >>> >> >>
> >>> >> >> Thank you~
> >>> >> >>
> >>> >> >> Xintong Song
> >>> >> >>
> >>> >> >>
> >>> >> >>
> >>> >> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang <
> danrtsey...@gmail.com>
> >>> >> wrote:
> >>> >> >>
> >>> >> >>> Hi Xintong,
> >>> >> >>>
> >>> >> >>>
> >>> >> >>> Thanks for your detailed proposal. I think many users are
> >>> suffering
> >>> >> from
> >>> >> >>> waste of resources. The resource spec of all task managers are
> >>> same
> >>> >> and
> >>> >> >>> we
> >>> >> >>> have to increase all task managers to make the heavy one more
> >>> stable.
> >>> >> So
> >>> >> >>> we
> >>> >> >>> will benefit from the fine grained resource management a lot. We
> >>> could
> >>> >> >>> get
> >>> >> >>> better resource utilization and stability.
> >>> >> >>>
> >>> >> >>>
> >>> >> >>> Just to share some thoughts.
> >>> >> >>>
> >>> >> >>>
> >>> >> >>>
> >>> >> >>>    1. How to calculate the resource specification of
> >>> TaskManagers? Do
> >>> >> >>> they
> >>> >> >>>    have them same resource spec calculated based on the
> >>> >> configuration? I
> >>> >> >>> think
> >>> >> >>>    we still have wasted resources in this situation. Or we could
> >>> start
> >>> >> >>>    TaskManagers with different spec.
> >>> >> >>>    2. If a slot is released and returned to SlotPool, does it
> >>> could be
> >>> >> >>>    reused by other SlotRequest that the request resource is
> >>> smaller
> >>> >> than
> >>> >> >>> it?
> >>> >> >>>       - If it is yes, what happens to the available resource in
> >>> the
> >>> >> >>>       TaskManager.
> >>> >> >>>       - What is the SlotStatus of the cached slot in SlotPool?
> The
> >>> >> >>>       AllocationId is null?
> >>> >> >>>    3. In a session cluster, some jobs are configured with
> operator
> >>> >> >>>    resources, meanwhile other jobs are using UNKNOWN. How to
> deal
> >>> with
> >>> >> >>> this
> >>> >> >>>    situation?
> >>> >> >>>
> >>> >> >>>
> >>> >> >>>
> >>> >> >>> Best,
> >>> >> >>> Yang
> >>> >> >>>
> >>> >> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五 下午8:57写道:
> >>> >> >>>
> >>> >> >>> > Thanks for the feedbacks, Yangze and Till.
> >>> >> >>> >
> >>> >> >>> > Yangze,
> >>> >> >>> >
> >>> >> >>> > I agree with you that we should make scheduling strategy
> >>> pluggable
> >>> >> and
> >>> >> >>> > optimize the strategy to reduce the memory fragmentation
> >>> problem,
> >>> >> and
> >>> >> >>> > thanks for the inputs on the potential algorithmic solutions.
> >>> >> However,
> >>> >> >>> I'm
> >>> >> >>> > in favor of keep this FLIP focusing on the overall mechanism
> >>> design
> >>> >> >>> rather
> >>> >> >>> > than strategies. Solving the fragmentation issue should be
> >>> >> considered
> >>> >> >>> as an
> >>> >> >>> > optimization, and I agree with Till that we probably should
> >>> tackle
> >>> >> this
> >>> >> >>> > afterwards.
> >>> >> >>> >
> >>> >> >>> > Till,
> >>> >> >>> >
> >>> >> >>> > - Regarding splitting the FLIP, I think it makes sense. The
> >>> operator
> >>> >> >>> > resource management and dynamic slot allocation do not have
> much
> >>> >> >>> dependency
> >>> >> >>> > on each other.
> >>> >> >>> >
> >>> >> >>> > - Regarding the default slot size, I think this is similar to
> >>> >> FLIP-49
> >>> >> >>> [1]
> >>> >> >>> > where we want all the deriving happens at one place. I think
> it
> >>> >> would
> >>> >> >>> be
> >>> >> >>> > nice to pass the default slot size into the task executor in
> the
> >>> >> same
> >>> >> >>> way
> >>> >> >>> > that we pass in the memory pool sizes in FLIP-49 [1].
> >>> >> >>> >
> >>> >> >>> > - Regarding the return value of
> >>> >> TaskExecutorGateway#requestResource, I
> >>> >> >>> > think you're right. We should avoid using null as the return
> >>> value.
> >>> >> I
> >>> >> >>> think
> >>> >> >>> > we probably should thrown an exception here.
> >>> >> >>> >
> >>> >> >>> > Thank you~
> >>> >> >>> >
> >>> >> >>> > Xintong Song
> >>> >> >>> >
> >>> >> >>> >
> >>> >> >>> > [1]
> >>> >> >>> >
> >>> >> >>> >
> >>> >> >>>
> >>> >>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> >>> >> >>> >
> >>> >> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann <
> >>> trohrm...@apache.org
> >>> >> >
> >>> >> >>> > wrote:
> >>> >> >>> >
> >>> >> >>> > > Hi Xintong,
> >>> >> >>> > >
> >>> >> >>> > > thanks for drafting this FLIP. I think your proposal helps
> to
> >>> >> >>> improve the
> >>> >> >>> > > execution of batch jobs more efficiently. Moreover, it
> >>> enables the
> >>> >> >>> proper
> >>> >> >>> > > integration of the Blink planner which is very important as
> >>> well.
> >>> >> >>> > >
> >>> >> >>> > > Overall, the FLIP looks good to me. I was wondering whether
> it
> >>> >> >>> wouldn't
> >>> >> >>> > > make sense to actually split it up into two FLIPs: Operator
> >>> >> resource
> >>> >> >>> > > management and dynamic slot allocation. I think these two
> >>> FLIPs
> >>> >> >>> could be
> >>> >> >>> > > seen as orthogonal and it would decrease the scope of each
> >>> >> individual
> >>> >> >>> > FLIP.
> >>> >> >>> > >
> >>> >> >>> > > Some smaller comments:
> >>> >> >>> > >
> >>> >> >>> > > - I'm not sure whether we should pass in the default slot
> size
> >>> >> via an
> >>> >> >>> > > environment variable. Without having unified the way how
> Flink
> >>> >> >>> components
> >>> >> >>> > > are configured [1], I think it would be better to pass it in
> >>> as
> >>> >> part
> >>> >> >>> of
> >>> >> >>> > the
> >>> >> >>> > > configuration.
> >>> >> >>> > > - I would avoid returning a null value from
> >>> >> >>> > > TaskExecutorGateway#requestResource if it cannot be
> fulfilled.
> >>> >> >>> Either we
> >>> >> >>> > > should introduce an explicit return value saying this or
> >>> throw an
> >>> >> >>> > > exception.
> >>> >> >>> > >
> >>> >> >>> > > Concerning Yangze's comments: I think you are right that it
> >>> would
> >>> >> be
> >>> >> >>> > > helpful to make the selection strategy pluggable. Also
> >>> batching
> >>> >> slot
> >>> >> >>> > > requests to the RM could be a good optimization. For the
> sake
> >>> of
> >>> >> >>> keeping
> >>> >> >>> > > the scope of this FLIP smaller I would try to tackle these
> >>> things
> >>> >> >>> after
> >>> >> >>> > the
> >>> >> >>> > > initial version has been completed (without spoiling these
> >>> >> >>> optimization
> >>> >> >>> > > opportunities). In particular batching the slot requests
> >>> depends
> >>> >> on
> >>> >> >>> the
> >>> >> >>> > > current scheduler refactoring and could also be realized on
> >>> the RM
> >>> >> >>> side
> >>> >> >>> > > only.
> >>> >> >>> > >
> >>> >> >>> > > [1]
> >>> >> >>> > >
> >>> >> >>> > >
> >>> >> >>> >
> >>> >> >>>
> >>> >>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> >>> >> >>> > >
> >>> >> >>> > > Cheers,
> >>> >> >>> > > Till
> >>> >> >>> > >
> >>> >> >>> > >
> >>> >> >>> > >
> >>> >> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo <
> >>> karma...@gmail.com>
> >>> >> >>> wrote:
> >>> >> >>> > >
> >>> >> >>> > > > Hi, Xintong
> >>> >> >>> > > >
> >>> >> >>> > > > Thanks to propose this FLIP. The general design looks good
> >>> to
> >>> >> me,
> >>> >> >>> +1
> >>> >> >>> > > > for this feature.
> >>> >> >>> > > >
> >>> >> >>> > > > Since slots in the same task executor could have different
> >>> >> resource
> >>> >> >>> > > > profile, we will
> >>> >> >>> > > > meet resource fragment problem. Think about this case:
> >>> >> >>> > > >  - request A want 1G memory while request B & C want 0.5G
> >>> memory
> >>> >> >>> > > >  - There are two task executors T1 & T2 with 1G and 0.5G
> >>> free
> >>> >> >>> memory
> >>> >> >>> > > > respectively
> >>> >> >>> > > > If B come first and we cut a slot from T1 for B, A must
> >>> wait for
> >>> >> >>> the
> >>> >> >>> > > > free resource from
> >>> >> >>> > > > other task. But A could have been scheduled immediately if
> >>> we
> >>> >> cut a
> >>> >> >>> > > > slot from T2 for B.
> >>> >> >>> > > >
> >>> >> >>> > > > The logic of findMatchingSlot now become finding a task
> >>> executor
> >>> >> >>> which
> >>> >> >>> > > > has enough
> >>> >> >>> > > > resource and then cut a slot from it. Current method could
> >>> be
> >>> >> seen
> >>> >> >>> as
> >>> >> >>> > > > "First-fit strategy",
> >>> >> >>> > > > which works well in general but sometimes could not be the
> >>> >> >>> optimization
> >>> >> >>> > > > method.
> >>> >> >>> > > >
> >>> >> >>> > > > Actually, this problem could be abstracted as "Bin Packing
> >>> >> >>> Problem"[1].
> >>> >> >>> > > > Here are
> >>> >> >>> > > > some common approximate algorithms:
> >>> >> >>> > > > - First fit
> >>> >> >>> > > > - Next fit
> >>> >> >>> > > > - Best fit
> >>> >> >>> > > >
> >>> >> >>> > > > But it become multi-dimensional bin packing problem if we
> >>> take
> >>> >> CPU
> >>> >> >>> > > > into account. It hard
> >>> >> >>> > > > to define which one is best fit now. Some research
> addressed
> >>> >> this
> >>> >> >>> > > > problem, such like Tetris[2].
> >>> >> >>> > > >
> >>> >> >>> > > > Here are some thinking about it:
> >>> >> >>> > > > 1. We could make the strategy of finding matching task
> >>> executor
> >>> >> >>> > > > pluginable. Let user to config the
> >>> >> >>> > > > best strategy in their scenario.
> >>> >> >>> > > > 2. We could support batch request interface in RM, because
> >>> we
> >>> >> have
> >>> >> >>> > > > opportunities to optimize
> >>> >> >>> > > > if we have more information. If we know the A, B, C at the
> >>> same
> >>> >> >>> time,
> >>> >> >>> > > > we could always make the best decision.
> >>> >> >>> > > >
> >>> >> >>> > > > [1] http://www.or.deis.unibo.it/kp/Chapter8.pdf
> >>> >> >>> > > > [2]
> >>> >> >>> >
> >>> >>
> https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf
> >>> >> >>> > > >
> >>> >> >>> > > > Best,
> >>> >> >>> > > > Yangze Guo
> >>> >> >>> > > >
> >>> >> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song <
> >>> >> >>> tonysong...@gmail.com>
> >>> >> >>> > > > wrote:
> >>> >> >>> > > > >
> >>> >> >>> > > > > Hi everyone,
> >>> >> >>> > > > >
> >>> >> >>> > > > > We would like to start a discussion thread on "FLIP-53:
> >>> Fine
> >>> >> >>> Grained
> >>> >> >>> > > > > Resource Management"[1], where we propose how to improve
> >>> Flink
> >>> >> >>> > resource
> >>> >> >>> > > > > management and scheduling.
> >>> >> >>> > > > >
> >>> >> >>> > > > > This FLIP mainly discusses the following issues.
> >>> >> >>> > > > >
> >>> >> >>> > > > >    - How to support tasks with fine grained resource
> >>> >> >>> requirements.
> >>> >> >>> > > > >    - How to unify resource management for jobs with /
> >>> without
> >>> >> >>> fine
> >>> >> >>> > > > grained
> >>> >> >>> > > > >    resource requirements.
> >>> >> >>> > > > >    - How to unify resource management for streaming /
> >>> batch
> >>> >> jobs.
> >>> >> >>> > > > >
> >>> >> >>> > > > > Key changes proposed in the FLIP are as follows.
> >>> >> >>> > > > >
> >>> >> >>> > > > >    - Unify memory management for operators with /
> without
> >>> fine
> >>> >> >>> > grained
> >>> >> >>> > > > >    resource requirements by applying a fraction based
> >>> quota
> >>> >> >>> > mechanism.
> >>> >> >>> > > > >    - Unify resource scheduling for streaming and batch
> >>> jobs by
> >>> >> >>> > setting
> >>> >> >>> > > > slot
> >>> >> >>> > > > >    sharing groups for pipelined regions during compiling
> >>> >> stage.
> >>> >> >>> > > > >    - Dynamically allocate slots from task executors'
> >>> available
> >>> >> >>> > > resources.
> >>> >> >>> > > > >
> >>> >> >>> > > > > Please find more details in the FLIP wiki document [1].
> >>> >> Looking
> >>> >> >>> > forward
> >>> >> >>> > > > to
> >>> >> >>> > > > > your feedbacks.
> >>> >> >>> > > > >
> >>> >> >>> > > > > Thank you~
> >>> >> >>> > > > >
> >>> >> >>> > > > > Xintong Song
> >>> >> >>> > > > >
> >>> >> >>> > > > >
> >>> >> >>> > > > > [1]
> >>> >> >>> > > > >
> >>> >> >>> > > >
> >>> >> >>> > >
> >>> >> >>> >
> >>> >> >>>
> >>> >>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management
> >>> >> >>> > > >
> >>> >> >>> > >
> >>> >> >>> >
> >>> >> >>>
> >>> >> >>
> >>> >>
> >>> >
> >>>
> >>
>

Reply via email to