Thanks for the explanations, Till.
Keeping the initial design as simple as possible sounds good to me.
There's no further concern from my side.

Thank you~

Xintong Song



On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <reed...@gmail.com> wrote:

> Thanks Till for the explanation and the follow up actions.
> That sounds good to me.
>
> Thanks,
> Zhu
>
> Till Rohrmann <trohrm...@apache.org> 于2021年1月26日周二 下午7:28写道:
>
> > Thanks a lot for all your feedback. Let me try to answer it.
> >
> > # ScaleUpController vs. RescaleController
> >
> > At the moment, the idea of the declarative scheduler is to run a job with
> > a parallelism which is as close to the target value as possible but to
> > never exceed it. Since the target value is currently fixed (this will
> > change once auto-scaling is supported), we never have to scale down. In
> > fact, scale down events only happen if the system loses slots and then
> you
> > cannot choose whether to scale down or not. This and the idea to keep the
> > initial design as simple as possible motivated the naming of
> > ScaleUpController. Once we support auto-scaling, we might have to rename
> > this interface. However, since I don't fully know how things will look
> like
> > with auto-scaling, I would like to refrain from this change now.
> >
> > # Relationship between declarative scheduler and existing implementations
> >
> > The declarative scheduler will implement the SchedulerNG interface. At
> the
> > moment I am not aware of any required changes to this interface. Also the
> > way the JobMaster interacts with the scheduler won't change to the best
> of
> > my knowledge.
> >
> > # Awareness of SlotAllocator of the ExecutionGraph
> >
> > The idea is to make the SlotAllocator not aware of the ExecutionGraph
> > because it can only be created after the SlotAllocator has decided on the
> > parallelism the job can be run with. Moreover, the idea is that the
> > JobInformation instance contains all the required information of a job to
> > make this decision.
> >
> > The SlotAllocator also reserves the slots for a job before the
> > ExecutionGraph is created. Consequently, we need a way to associate the
> > slots with the Executions of the ExecutionGraph. Here we have decided to
> > use the ExecutionVertexID as the identificator. We could also introduce a
> > new identificator as long as we can map this one to the
> ExecutionVertexID.
> > We could for example use JobVertexID and subtask id as the identificator
> > but this is exactly what the ExecutionVertexID is. That's why we decided
> to
> > reuse it for the time being.
> >
> > # ScaleUpController
> >
> > ## Cumulative parallelism
> >
> > Yes, the cumulative parallelism is the sum of all tasks.
> >
> > ## When to call the ScaleUpController
> >
> > Yes, the idea is that the scheduler will check whether one can run the
> job
> > with an increased parallelism if there are new slots available. If this
> is
> > the case, then we will ask the ScaleUpController, whether we actually
> > should scale up (e.g. whether the increase is significant enough or
> whether
> > enough time has passed between the last scale up operation).
> >
> > ## Do we provide enough information to the ScaleUpController
> >
> > I am pretty sure that we don't provide the ScaleUpController enough
> > information to make the best possible decision. We wanted to keep it
> simple
> > in the first version and iterate on the interface with the help of user
> > feedback. I think that this interface won't fundamentally change the
> > scheduler and, hence, it shouldn't block future extensions by starting
> with
> > a simple interface.
> >
> > # Cluster vs. job configuration
> >
> > I think you are right that it would be most flexible if one could select
> a
> > scheduler on a per job basis with falling back to the cluster
> configuration
> > if nothing has been specified. For the sake of simplicity and narrowing
> > down the scope I would consider this as a follow up.
> >
> > # Performance on failover
> >
> > I agree that the performance won't be optimal in the failover case
> because
> > of 1) having to recreate the EG and 2) redundant IO operations. For 1) I
> > agree that FLINK-21110 will help a lot. For 2) moving the IO operations
> out
> > of the EG could be a good improvement. With the same argument as above, I
> > would consider this a follow up because I would like to keep the initial
> > design as simple as possible and I think that these performance
> > optimizations are not required to make this feature work.
> >
> > # Lost execution history
> >
> > You are right Zhu Zhu that recreating the ExecutionGraph causes the loss
> > of information. We are currently investigating which information is
> > strictly required and needs to be maintained across ExecutionGraph
> > creations (mainly for tests to succeed). One idea is to either store this
> > information outside of the ExecutionGraph or to initialize the
> > ExecutionGraph with the information from the previous instance. Most
> > likely, we won't give a guarantee that all counters, timestamps and
> metrics
> > are correctly maintained across failovers in the first version, though.
> It
> > is a bit the same argument as above that this is not strictly required to
> > make this feature work. Maybe this means that this feature is not
> > production ready for some users, but I think this is ok.
> >
> > In general, to fully integrate the declarative scheduler with the web ui
> > we have to be able to display changing ExecutionGraphs. Ideally, we would
> > have something like a timeline where one can select the time for which to
> > display the state of the job execution. If one goes all the way to the
> > right, the system shows the live state and all other positions are the
> > present time minus some offset.
> >
> > I will add the follow ups to the FLIP as potential improvements in order
> > to keep track of them.
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <reed...@gmail.com> wrote:
> >
> >> Thanks for the proposal! @Till Rohrmann <trohrm...@apache.org>
> >>
> >> The design looks generally good to me. I have 2 concerns though:
> >>
> >> # performance on failover
> >> I can see is that an ExecutionGraph will be built and initialized on
> each
> >> task failure. The process is possibly to be slow:
> >> 1. the topology building can be very slow (much slower than the
> >> scheduling
> >> or restarting process) when the job scale becomes large (see
> >> FLINK-20612).
> >> Luckily FLINK-21110 will improve it.
> >> 2. the input/output format can be slow due to possible IO work to
> >> external services.
> >> Maybe we should extract it out from ExecutionGraph building?
> >>
> >> # execution history lost
> >> After building and using a new ExecutionGraph, the execution history in
> >> the
> >> old graph will be lost, including status timestamps of the job, prior
> >> execution
> >> attempts and their failures. Should we let the new EG inherit these
> >> information
> >> from prior EG? Maybe as a future plan with further discussions regarding
> >> the
> >> varying parallelism.
> >>
> >> Thanks,
> >> Zhu
> >>
> >> Xintong Song <tonysong...@gmail.com> 于2021年1月24日周日 上午11:55写道:
> >>
> >>> Thanks for preparing the FLIP and starting the discussion, Till.
> >>>
> >>> I have a few questions trying to understand the design.
> >>>
> >>> ## What is the relationship between the current and new schedulers?
> >>> IIUC, the new declarative scheduler will coexist with the current
> >>> scheduler, as an alternative that the user needs to explicitly switch
> to.
> >>> Then does it require any changes to the scheduler interfaces and how
> the
> >>> JobMaster interacts with it?
> >>>
> >>> ## Is `SlotAllocator` aware of `ExecutionGraph`?
> >>> Looking at the interfaces, it seems to me that `SlotAllocator` only
> takes
> >>> `JobInformation` and `VertexInformation` as topology inputs. However,
> it
> >>> generates `ParallelismAndResourceAssignments` which maps slots to
> >>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is
> >>> generated outside `SlotAllocator` while `ExecutionVertexID`s are
> >>> generated
> >>> inside `SlotAllocator`.
> >>>
> >>> ## About `ScaleUpController#canScaleUp`
> >>>
> >>> ### What is cumulative parallelism?
> >>> The interface shows that the cumulative parallelism is a single number
> >>> per
> >>> job graph. I assume this is the sum of parallelism of all vertices?
> >>>
> >>> ### Is this interface expected to be called before or after
> >>> `SlotAllocator#determineParallelism`?
> >>> IIUC, on new resources appear, the scheduler always calls
> >>> `SlotAllocator#determineParallelism` to generate a new plan based on
> the
> >>> available resources, and `ScaleUpController#canScaleUp` is then called
> to
> >>> decide whether to apply the new plan, according to whether the
> >>> increasement
> >>> is significant, how long the job has been running since last restart,
> >>> etc.
> >>> Is that correct?
> >>>
> >>> ### The cumulative parallelism may not be enough for deciding whether
> the
> >>> job should be scaled up.
> >>> I'm assuming my understanding for the above questions are correct.
> Please
> >>> correct me if not.
> >>>
> >>> I noticed Chesnay's comment on the wiki page about making the decision
> >>> based on when the job entered the execution state last time.
> >>>
> >>> In addition to that, there could also be situations where jobs may not
> >>> benefit much an increment in cumulative parallelism.
> >>> E.g., for a topology A -> B, where A and B are in different slot
> sharing
> >>> groups, and the current parallelism for both A and B are 2. When 1 new
> >>> slot
> >>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3.
> >>> But
> >>> this may not be a significant beneficial change for the job because the
> >>> overall performance is still bottlenecked by the parallelism of B.
> >>>
> >>> ## Cluster vs. Job configuration
> >>> I'm not entirely sure about specifying which scheduler to be used via a
> >>> cluster level configuration option. Each job in a Flink cluster has its
> >>> own
> >>> JobMaster, and which scheduler to use is internal to that JobMaster. I
> >>> understand that the declarative scheduler requires the cluster to use
> >>> declarative resource management. However, other jobs should still be
> >>> allowed to use other scheduler implementations that also support
> >>> declarative resource management (e.g. the current `DefaultScheduler`).
> >>> Maybe we should consider the cluster level configuration option as a
> >>> default scheduler, and allow the job to specify a different scheduler
> in
> >>> its execution config. This is similar to how we specify which state
> >>> backend
> >>> to be used.
> >>>
> >>> ## Minor: It might be better to also show in the state machine figure
> >>> that
> >>> it can go from `Executing` to `Restarting` when new resources appear.
> >>>
> >>> Thank you~
> >>>
> >>> Xintong Song
> >>>
> >>>
> >>>
> >>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <stevenz...@gmail.com>
> wrote:
> >>>
> >>> > Till, thanks a lot for the proposal.
> >>> >
> >>> > Even if the initial phase is only to support scale-up, maybe the
> >>> > "ScaleUpController" interface should be called "RescaleController" so
> >>> that
> >>> > in the future scale-down can be added.
> >>> >
> >>> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <trohrm...@apache.org>
> >>> > wrote:
> >>> >
> >>> > > Hi everyone,
> >>> > >
> >>> > > I would like to start a discussion about adding a new type of
> >>> scheduler
> >>> > to
> >>> > > Flink. The declarative scheduler will first declare the required
> >>> > resources
> >>> > > and wait for them before deciding on the actual parallelism of a
> job.
> >>> > > Thereby it can better handle situations where resources cannot be
> >>> fully
> >>> > > fulfilled. Moreover, it will act as a building block for the
> reactive
> >>> > mode
> >>> > > where Flink should scale to the maximum of the currently available
> >>> > > resources.
> >>> > >
> >>> > > Please find more details in the FLIP wiki document [1]. Looking
> >>> forward
> >>> > to
> >>> > > your feedback.
> >>> > >
> >>> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg
> >>> > >
> >>> > > Cheers,
> >>> > > Till
> >>> > >
> >>> >
> >>>
> >>
>

Reply via email to