Thanks Till for the explanation and the follow up actions.
That sounds good to me.

Thanks,
Zhu

Till Rohrmann <trohrm...@apache.org> 于2021年1月26日周二 下午7:28写道:

> Thanks a lot for all your feedback. Let me try to answer it.
>
> # ScaleUpController vs. RescaleController
>
> At the moment, the idea of the declarative scheduler is to run a job with
> a parallelism which is as close to the target value as possible but to
> never exceed it. Since the target value is currently fixed (this will
> change once auto-scaling is supported), we never have to scale down. In
> fact, scale down events only happen if the system loses slots and then you
> cannot choose whether to scale down or not. This and the idea to keep the
> initial design as simple as possible motivated the naming of
> ScaleUpController. Once we support auto-scaling, we might have to rename
> this interface. However, since I don't fully know how things will look like
> with auto-scaling, I would like to refrain from this change now.
>
> # Relationship between declarative scheduler and existing implementations
>
> The declarative scheduler will implement the SchedulerNG interface. At the
> moment I am not aware of any required changes to this interface. Also the
> way the JobMaster interacts with the scheduler won't change to the best of
> my knowledge.
>
> # Awareness of SlotAllocator of the ExecutionGraph
>
> The idea is to make the SlotAllocator not aware of the ExecutionGraph
> because it can only be created after the SlotAllocator has decided on the
> parallelism the job can be run with. Moreover, the idea is that the
> JobInformation instance contains all the required information of a job to
> make this decision.
>
> The SlotAllocator also reserves the slots for a job before the
> ExecutionGraph is created. Consequently, we need a way to associate the
> slots with the Executions of the ExecutionGraph. Here we have decided to
> use the ExecutionVertexID as the identificator. We could also introduce a
> new identificator as long as we can map this one to the ExecutionVertexID.
> We could for example use JobVertexID and subtask id as the identificator
> but this is exactly what the ExecutionVertexID is. That's why we decided to
> reuse it for the time being.
>
> # ScaleUpController
>
> ## Cumulative parallelism
>
> Yes, the cumulative parallelism is the sum of all tasks.
>
> ## When to call the ScaleUpController
>
> Yes, the idea is that the scheduler will check whether one can run the job
> with an increased parallelism if there are new slots available. If this is
> the case, then we will ask the ScaleUpController, whether we actually
> should scale up (e.g. whether the increase is significant enough or whether
> enough time has passed between the last scale up operation).
>
> ## Do we provide enough information to the ScaleUpController
>
> I am pretty sure that we don't provide the ScaleUpController enough
> information to make the best possible decision. We wanted to keep it simple
> in the first version and iterate on the interface with the help of user
> feedback. I think that this interface won't fundamentally change the
> scheduler and, hence, it shouldn't block future extensions by starting with
> a simple interface.
>
> # Cluster vs. job configuration
>
> I think you are right that it would be most flexible if one could select a
> scheduler on a per job basis with falling back to the cluster configuration
> if nothing has been specified. For the sake of simplicity and narrowing
> down the scope I would consider this as a follow up.
>
> # Performance on failover
>
> I agree that the performance won't be optimal in the failover case because
> of 1) having to recreate the EG and 2) redundant IO operations. For 1) I
> agree that FLINK-21110 will help a lot. For 2) moving the IO operations out
> of the EG could be a good improvement. With the same argument as above, I
> would consider this a follow up because I would like to keep the initial
> design as simple as possible and I think that these performance
> optimizations are not required to make this feature work.
>
> # Lost execution history
>
> You are right Zhu Zhu that recreating the ExecutionGraph causes the loss
> of information. We are currently investigating which information is
> strictly required and needs to be maintained across ExecutionGraph
> creations (mainly for tests to succeed). One idea is to either store this
> information outside of the ExecutionGraph or to initialize the
> ExecutionGraph with the information from the previous instance. Most
> likely, we won't give a guarantee that all counters, timestamps and metrics
> are correctly maintained across failovers in the first version, though. It
> is a bit the same argument as above that this is not strictly required to
> make this feature work. Maybe this means that this feature is not
> production ready for some users, but I think this is ok.
>
> In general, to fully integrate the declarative scheduler with the web ui
> we have to be able to display changing ExecutionGraphs. Ideally, we would
> have something like a timeline where one can select the time for which to
> display the state of the job execution. If one goes all the way to the
> right, the system shows the live state and all other positions are the
> present time minus some offset.
>
> I will add the follow ups to the FLIP as potential improvements in order
> to keep track of them.
>
> Cheers,
> Till
>
> On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <reed...@gmail.com> wrote:
>
>> Thanks for the proposal! @Till Rohrmann <trohrm...@apache.org>
>>
>> The design looks generally good to me. I have 2 concerns though:
>>
>> # performance on failover
>> I can see is that an ExecutionGraph will be built and initialized on each
>> task failure. The process is possibly to be slow:
>> 1. the topology building can be very slow (much slower than the
>> scheduling
>> or restarting process) when the job scale becomes large (see
>> FLINK-20612).
>> Luckily FLINK-21110 will improve it.
>> 2. the input/output format can be slow due to possible IO work to
>> external services.
>> Maybe we should extract it out from ExecutionGraph building?
>>
>> # execution history lost
>> After building and using a new ExecutionGraph, the execution history in
>> the
>> old graph will be lost, including status timestamps of the job, prior
>> execution
>> attempts and their failures. Should we let the new EG inherit these
>> information
>> from prior EG? Maybe as a future plan with further discussions regarding
>> the
>> varying parallelism.
>>
>> Thanks,
>> Zhu
>>
>> Xintong Song <tonysong...@gmail.com> 于2021年1月24日周日 上午11:55写道:
>>
>>> Thanks for preparing the FLIP and starting the discussion, Till.
>>>
>>> I have a few questions trying to understand the design.
>>>
>>> ## What is the relationship between the current and new schedulers?
>>> IIUC, the new declarative scheduler will coexist with the current
>>> scheduler, as an alternative that the user needs to explicitly switch to.
>>> Then does it require any changes to the scheduler interfaces and how the
>>> JobMaster interacts with it?
>>>
>>> ## Is `SlotAllocator` aware of `ExecutionGraph`?
>>> Looking at the interfaces, it seems to me that `SlotAllocator` only takes
>>> `JobInformation` and `VertexInformation` as topology inputs. However, it
>>> generates `ParallelismAndResourceAssignments` which maps slots to
>>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is
>>> generated outside `SlotAllocator` while `ExecutionVertexID`s are
>>> generated
>>> inside `SlotAllocator`.
>>>
>>> ## About `ScaleUpController#canScaleUp`
>>>
>>> ### What is cumulative parallelism?
>>> The interface shows that the cumulative parallelism is a single number
>>> per
>>> job graph. I assume this is the sum of parallelism of all vertices?
>>>
>>> ### Is this interface expected to be called before or after
>>> `SlotAllocator#determineParallelism`?
>>> IIUC, on new resources appear, the scheduler always calls
>>> `SlotAllocator#determineParallelism` to generate a new plan based on the
>>> available resources, and `ScaleUpController#canScaleUp` is then called to
>>> decide whether to apply the new plan, according to whether the
>>> increasement
>>> is significant, how long the job has been running since last restart,
>>> etc.
>>> Is that correct?
>>>
>>> ### The cumulative parallelism may not be enough for deciding whether the
>>> job should be scaled up.
>>> I'm assuming my understanding for the above questions are correct. Please
>>> correct me if not.
>>>
>>> I noticed Chesnay's comment on the wiki page about making the decision
>>> based on when the job entered the execution state last time.
>>>
>>> In addition to that, there could also be situations where jobs may not
>>> benefit much an increment in cumulative parallelism.
>>> E.g., for a topology A -> B, where A and B are in different slot sharing
>>> groups, and the current parallelism for both A and B are 2. When 1 new
>>> slot
>>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3.
>>> But
>>> this may not be a significant beneficial change for the job because the
>>> overall performance is still bottlenecked by the parallelism of B.
>>>
>>> ## Cluster vs. Job configuration
>>> I'm not entirely sure about specifying which scheduler to be used via a
>>> cluster level configuration option. Each job in a Flink cluster has its
>>> own
>>> JobMaster, and which scheduler to use is internal to that JobMaster. I
>>> understand that the declarative scheduler requires the cluster to use
>>> declarative resource management. However, other jobs should still be
>>> allowed to use other scheduler implementations that also support
>>> declarative resource management (e.g. the current `DefaultScheduler`).
>>> Maybe we should consider the cluster level configuration option as a
>>> default scheduler, and allow the job to specify a different scheduler in
>>> its execution config. This is similar to how we specify which state
>>> backend
>>> to be used.
>>>
>>> ## Minor: It might be better to also show in the state machine figure
>>> that
>>> it can go from `Executing` to `Restarting` when new resources appear.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <stevenz...@gmail.com> wrote:
>>>
>>> > Till, thanks a lot for the proposal.
>>> >
>>> > Even if the initial phase is only to support scale-up, maybe the
>>> > "ScaleUpController" interface should be called "RescaleController" so
>>> that
>>> > in the future scale-down can be added.
>>> >
>>> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <trohrm...@apache.org>
>>> > wrote:
>>> >
>>> > > Hi everyone,
>>> > >
>>> > > I would like to start a discussion about adding a new type of
>>> scheduler
>>> > to
>>> > > Flink. The declarative scheduler will first declare the required
>>> > resources
>>> > > and wait for them before deciding on the actual parallelism of a job.
>>> > > Thereby it can better handle situations where resources cannot be
>>> fully
>>> > > fulfilled. Moreover, it will act as a building block for the reactive
>>> > mode
>>> > > where Flink should scale to the maximum of the currently available
>>> > > resources.
>>> > >
>>> > > Please find more details in the FLIP wiki document [1]. Looking
>>> forward
>>> > to
>>> > > your feedback.
>>> > >
>>> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg
>>> > >
>>> > > Cheers,
>>> > > Till
>>> > >
>>> >
>>>
>>

Reply via email to