Thanks a lot for all your input. I have update the FLIP-160 with your
suggestions:

1) Add job configuration as a follow up
2) Pull out IO operations out of the ExecutionGraph if the failover becomes
too slow
3) Introduce a configuration parameter for the timeout in the "Waiting for
resources" state (coming from the FLIP-159 discussion)

Next, I will create the vote thread for this FLIP.

Cheers,
Till

On Fri, Jan 29, 2021 at 10:06 AM Chesnay Schepler <ches...@apache.org>
wrote:

> Yes, since we're only operating within the scheduler, which exists
> separately for each job, we don't have to worry about collisions with
> other jobs.
>
> On 1/27/2021 11:08 AM, Yangze Guo wrote:
> > Thanks for preparing the FLIP and driving the discussion, Till. All of
> > my questions have already been answered in the previous discussion.
> >
> > I just have one minor reminder regarding using ExecutionVertexID as
> > the identificator. The JobVertexID is generated according to the
> > topology instead of generated randomly. Thus, it's not guaranteed to
> > be unique across different jobs and different execution. This
> > characteristic is also inherited by the ExecutionVertexID. UUIC, the
> > ParallelismAndResourceAssignments is a job-level concept so it will
> > work well.
> >
> > Best,
> > Yangze Guo
> >
> > On Wed, Jan 27, 2021 at 10:37 AM Xintong Song <tonysong...@gmail.com>
> wrote:
> >> Thanks for the explanations, Till.
> >> Keeping the initial design as simple as possible sounds good to me.
> >> There's no further concern from my side.
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <reed...@gmail.com> wrote:
> >>
> >>> Thanks Till for the explanation and the follow up actions.
> >>> That sounds good to me.
> >>>
> >>> Thanks,
> >>> Zhu
> >>>
> >>> Till Rohrmann <trohrm...@apache.org> 于2021年1月26日周二 下午7:28写道:
> >>>
> >>>> Thanks a lot for all your feedback. Let me try to answer it.
> >>>>
> >>>> # ScaleUpController vs. RescaleController
> >>>>
> >>>> At the moment, the idea of the declarative scheduler is to run a job
> with
> >>>> a parallelism which is as close to the target value as possible but to
> >>>> never exceed it. Since the target value is currently fixed (this will
> >>>> change once auto-scaling is supported), we never have to scale down.
> In
> >>>> fact, scale down events only happen if the system loses slots and then
> >>> you
> >>>> cannot choose whether to scale down or not. This and the idea to keep
> the
> >>>> initial design as simple as possible motivated the naming of
> >>>> ScaleUpController. Once we support auto-scaling, we might have to
> rename
> >>>> this interface. However, since I don't fully know how things will look
> >>> like
> >>>> with auto-scaling, I would like to refrain from this change now.
> >>>>
> >>>> # Relationship between declarative scheduler and existing
> implementations
> >>>>
> >>>> The declarative scheduler will implement the SchedulerNG interface. At
> >>> the
> >>>> moment I am not aware of any required changes to this interface. Also
> the
> >>>> way the JobMaster interacts with the scheduler won't change to the
> best
> >>> of
> >>>> my knowledge.
> >>>>
> >>>> # Awareness of SlotAllocator of the ExecutionGraph
> >>>>
> >>>> The idea is to make the SlotAllocator not aware of the ExecutionGraph
> >>>> because it can only be created after the SlotAllocator has decided on
> the
> >>>> parallelism the job can be run with. Moreover, the idea is that the
> >>>> JobInformation instance contains all the required information of a
> job to
> >>>> make this decision.
> >>>>
> >>>> The SlotAllocator also reserves the slots for a job before the
> >>>> ExecutionGraph is created. Consequently, we need a way to associate
> the
> >>>> slots with the Executions of the ExecutionGraph. Here we have decided
> to
> >>>> use the ExecutionVertexID as the identificator. We could also
> introduce a
> >>>> new identificator as long as we can map this one to the
> >>> ExecutionVertexID.
> >>>> We could for example use JobVertexID and subtask id as the
> identificator
> >>>> but this is exactly what the ExecutionVertexID is. That's why we
> decided
> >>> to
> >>>> reuse it for the time being.
> >>>>
> >>>> # ScaleUpController
> >>>>
> >>>> ## Cumulative parallelism
> >>>>
> >>>> Yes, the cumulative parallelism is the sum of all tasks.
> >>>>
> >>>> ## When to call the ScaleUpController
> >>>>
> >>>> Yes, the idea is that the scheduler will check whether one can run the
> >>> job
> >>>> with an increased parallelism if there are new slots available. If
> this
> >>> is
> >>>> the case, then we will ask the ScaleUpController, whether we actually
> >>>> should scale up (e.g. whether the increase is significant enough or
> >>> whether
> >>>> enough time has passed between the last scale up operation).
> >>>>
> >>>> ## Do we provide enough information to the ScaleUpController
> >>>>
> >>>> I am pretty sure that we don't provide the ScaleUpController enough
> >>>> information to make the best possible decision. We wanted to keep it
> >>> simple
> >>>> in the first version and iterate on the interface with the help of
> user
> >>>> feedback. I think that this interface won't fundamentally change the
> >>>> scheduler and, hence, it shouldn't block future extensions by starting
> >>> with
> >>>> a simple interface.
> >>>>
> >>>> # Cluster vs. job configuration
> >>>>
> >>>> I think you are right that it would be most flexible if one could
> select
> >>> a
> >>>> scheduler on a per job basis with falling back to the cluster
> >>> configuration
> >>>> if nothing has been specified. For the sake of simplicity and
> narrowing
> >>>> down the scope I would consider this as a follow up.
> >>>>
> >>>> # Performance on failover
> >>>>
> >>>> I agree that the performance won't be optimal in the failover case
> >>> because
> >>>> of 1) having to recreate the EG and 2) redundant IO operations. For
> 1) I
> >>>> agree that FLINK-21110 will help a lot. For 2) moving the IO
> operations
> >>> out
> >>>> of the EG could be a good improvement. With the same argument as
> above, I
> >>>> would consider this a follow up because I would like to keep the
> initial
> >>>> design as simple as possible and I think that these performance
> >>>> optimizations are not required to make this feature work.
> >>>>
> >>>> # Lost execution history
> >>>>
> >>>> You are right Zhu Zhu that recreating the ExecutionGraph causes the
> loss
> >>>> of information. We are currently investigating which information is
> >>>> strictly required and needs to be maintained across ExecutionGraph
> >>>> creations (mainly for tests to succeed). One idea is to either store
> this
> >>>> information outside of the ExecutionGraph or to initialize the
> >>>> ExecutionGraph with the information from the previous instance. Most
> >>>> likely, we won't give a guarantee that all counters, timestamps and
> >>> metrics
> >>>> are correctly maintained across failovers in the first version,
> though.
> >>> It
> >>>> is a bit the same argument as above that this is not strictly
> required to
> >>>> make this feature work. Maybe this means that this feature is not
> >>>> production ready for some users, but I think this is ok.
> >>>>
> >>>> In general, to fully integrate the declarative scheduler with the web
> ui
> >>>> we have to be able to display changing ExecutionGraphs. Ideally, we
> would
> >>>> have something like a timeline where one can select the time for
> which to
> >>>> display the state of the job execution. If one goes all the way to the
> >>>> right, the system shows the live state and all other positions are the
> >>>> present time minus some offset.
> >>>>
> >>>> I will add the follow ups to the FLIP as potential improvements in
> order
> >>>> to keep track of them.
> >>>>
> >>>> Cheers,
> >>>> Till
> >>>>
> >>>> On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <reed...@gmail.com> wrote:
> >>>>
> >>>>> Thanks for the proposal! @Till Rohrmann <trohrm...@apache.org>
> >>>>>
> >>>>> The design looks generally good to me. I have 2 concerns though:
> >>>>>
> >>>>> # performance on failover
> >>>>> I can see is that an ExecutionGraph will be built and initialized on
> >>> each
> >>>>> task failure. The process is possibly to be slow:
> >>>>> 1. the topology building can be very slow (much slower than the
> >>>>> scheduling
> >>>>> or restarting process) when the job scale becomes large (see
> >>>>> FLINK-20612).
> >>>>> Luckily FLINK-21110 will improve it.
> >>>>> 2. the input/output format can be slow due to possible IO work to
> >>>>> external services.
> >>>>> Maybe we should extract it out from ExecutionGraph building?
> >>>>>
> >>>>> # execution history lost
> >>>>> After building and using a new ExecutionGraph, the execution history
> in
> >>>>> the
> >>>>> old graph will be lost, including status timestamps of the job, prior
> >>>>> execution
> >>>>> attempts and their failures. Should we let the new EG inherit these
> >>>>> information
> >>>>> from prior EG? Maybe as a future plan with further discussions
> regarding
> >>>>> the
> >>>>> varying parallelism.
> >>>>>
> >>>>> Thanks,
> >>>>> Zhu
> >>>>>
> >>>>> Xintong Song <tonysong...@gmail.com> 于2021年1月24日周日 上午11:55写道:
> >>>>>
> >>>>>> Thanks for preparing the FLIP and starting the discussion, Till.
> >>>>>>
> >>>>>> I have a few questions trying to understand the design.
> >>>>>>
> >>>>>> ## What is the relationship between the current and new schedulers?
> >>>>>> IIUC, the new declarative scheduler will coexist with the current
> >>>>>> scheduler, as an alternative that the user needs to explicitly
> switch
> >>> to.
> >>>>>> Then does it require any changes to the scheduler interfaces and how
> >>> the
> >>>>>> JobMaster interacts with it?
> >>>>>>
> >>>>>> ## Is `SlotAllocator` aware of `ExecutionGraph`?
> >>>>>> Looking at the interfaces, it seems to me that `SlotAllocator` only
> >>> takes
> >>>>>> `JobInformation` and `VertexInformation` as topology inputs.
> However,
> >>> it
> >>>>>> generates `ParallelismAndResourceAssignments` which maps slots to
> >>>>>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is
> >>>>>> generated outside `SlotAllocator` while `ExecutionVertexID`s are
> >>>>>> generated
> >>>>>> inside `SlotAllocator`.
> >>>>>>
> >>>>>> ## About `ScaleUpController#canScaleUp`
> >>>>>>
> >>>>>> ### What is cumulative parallelism?
> >>>>>> The interface shows that the cumulative parallelism is a single
> number
> >>>>>> per
> >>>>>> job graph. I assume this is the sum of parallelism of all vertices?
> >>>>>>
> >>>>>> ### Is this interface expected to be called before or after
> >>>>>> `SlotAllocator#determineParallelism`?
> >>>>>> IIUC, on new resources appear, the scheduler always calls
> >>>>>> `SlotAllocator#determineParallelism` to generate a new plan based on
> >>> the
> >>>>>> available resources, and `ScaleUpController#canScaleUp` is then
> called
> >>> to
> >>>>>> decide whether to apply the new plan, according to whether the
> >>>>>> increasement
> >>>>>> is significant, how long the job has been running since last
> restart,
> >>>>>> etc.
> >>>>>> Is that correct?
> >>>>>>
> >>>>>> ### The cumulative parallelism may not be enough for deciding
> whether
> >>> the
> >>>>>> job should be scaled up.
> >>>>>> I'm assuming my understanding for the above questions are correct.
> >>> Please
> >>>>>> correct me if not.
> >>>>>>
> >>>>>> I noticed Chesnay's comment on the wiki page about making the
> decision
> >>>>>> based on when the job entered the execution state last time.
> >>>>>>
> >>>>>> In addition to that, there could also be situations where jobs may
> not
> >>>>>> benefit much an increment in cumulative parallelism.
> >>>>>> E.g., for a topology A -> B, where A and B are in different slot
> >>> sharing
> >>>>>> groups, and the current parallelism for both A and B are 2. When 1
> new
> >>>>>> slot
> >>>>>> appears, `SlotAllocator` may suggest increasing parallelism of A to
> 3.
> >>>>>> But
> >>>>>> this may not be a significant beneficial change for the job because
> the
> >>>>>> overall performance is still bottlenecked by the parallelism of B.
> >>>>>>
> >>>>>> ## Cluster vs. Job configuration
> >>>>>> I'm not entirely sure about specifying which scheduler to be used
> via a
> >>>>>> cluster level configuration option. Each job in a Flink cluster has
> its
> >>>>>> own
> >>>>>> JobMaster, and which scheduler to use is internal to that
> JobMaster. I
> >>>>>> understand that the declarative scheduler requires the cluster to
> use
> >>>>>> declarative resource management. However, other jobs should still be
> >>>>>> allowed to use other scheduler implementations that also support
> >>>>>> declarative resource management (e.g. the current
> `DefaultScheduler`).
> >>>>>> Maybe we should consider the cluster level configuration option as a
> >>>>>> default scheduler, and allow the job to specify a different
> scheduler
> >>> in
> >>>>>> its execution config. This is similar to how we specify which state
> >>>>>> backend
> >>>>>> to be used.
> >>>>>>
> >>>>>> ## Minor: It might be better to also show in the state machine
> figure
> >>>>>> that
> >>>>>> it can go from `Executing` to `Restarting` when new resources
> appear.
> >>>>>>
> >>>>>> Thank you~
> >>>>>>
> >>>>>> Xintong Song
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <stevenz...@gmail.com>
> >>> wrote:
> >>>>>>> Till, thanks a lot for the proposal.
> >>>>>>>
> >>>>>>> Even if the initial phase is only to support scale-up, maybe the
> >>>>>>> "ScaleUpController" interface should be called "RescaleController"
> so
> >>>>>> that
> >>>>>>> in the future scale-down can be added.
> >>>>>>>
> >>>>>>> On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <
> trohrm...@apache.org>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi everyone,
> >>>>>>>>
> >>>>>>>> I would like to start a discussion about adding a new type of
> >>>>>> scheduler
> >>>>>>> to
> >>>>>>>> Flink. The declarative scheduler will first declare the required
> >>>>>>> resources
> >>>>>>>> and wait for them before deciding on the actual parallelism of a
> >>> job.
> >>>>>>>> Thereby it can better handle situations where resources cannot be
> >>>>>> fully
> >>>>>>>> fulfilled. Moreover, it will act as a building block for the
> >>> reactive
> >>>>>>> mode
> >>>>>>>> where Flink should scale to the maximum of the currently available
> >>>>>>>> resources.
> >>>>>>>>
> >>>>>>>> Please find more details in the FLIP wiki document [1]. Looking
> >>>>>> forward
> >>>>>>> to
> >>>>>>>> your feedback.
> >>>>>>>>
> >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mwtRCg
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Till
> >>>>>>>>
>
>

Reply via email to