Thanks a lot for all your input. I have update the FLIP-160 with your suggestions:
1) Add job configuration as a follow up 2) Pull out IO operations out of the ExecutionGraph if the failover becomes too slow 3) Introduce a configuration parameter for the timeout in the "Waiting for resources" state (coming from the FLIP-159 discussion) Next, I will create the vote thread for this FLIP. Cheers, Till On Fri, Jan 29, 2021 at 10:06 AM Chesnay Schepler <ches...@apache.org> wrote: > Yes, since we're only operating within the scheduler, which exists > separately for each job, we don't have to worry about collisions with > other jobs. > > On 1/27/2021 11:08 AM, Yangze Guo wrote: > > Thanks for preparing the FLIP and driving the discussion, Till. All of > > my questions have already been answered in the previous discussion. > > > > I just have one minor reminder regarding using ExecutionVertexID as > > the identificator. The JobVertexID is generated according to the > > topology instead of generated randomly. Thus, it's not guaranteed to > > be unique across different jobs and different execution. This > > characteristic is also inherited by the ExecutionVertexID. UUIC, the > > ParallelismAndResourceAssignments is a job-level concept so it will > > work well. > > > > Best, > > Yangze Guo > > > > On Wed, Jan 27, 2021 at 10:37 AM Xintong Song <tonysong...@gmail.com> > wrote: > >> Thanks for the explanations, Till. > >> Keeping the initial design as simple as possible sounds good to me. > >> There's no further concern from my side. > >> > >> Thank you~ > >> > >> Xintong Song > >> > >> > >> > >> On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <reed...@gmail.com> wrote: > >> > >>> Thanks Till for the explanation and the follow up actions. > >>> That sounds good to me. > >>> > >>> Thanks, > >>> Zhu > >>> > >>> Till Rohrmann <trohrm...@apache.org> 于2021年1月26日周二 下午7:28写道: > >>> > >>>> Thanks a lot for all your feedback. Let me try to answer it. > >>>> > >>>> # ScaleUpController vs. RescaleController > >>>> > >>>> At the moment, the idea of the declarative scheduler is to run a job > with > >>>> a parallelism which is as close to the target value as possible but to > >>>> never exceed it. Since the target value is currently fixed (this will > >>>> change once auto-scaling is supported), we never have to scale down. > In > >>>> fact, scale down events only happen if the system loses slots and then > >>> you > >>>> cannot choose whether to scale down or not. This and the idea to keep > the > >>>> initial design as simple as possible motivated the naming of > >>>> ScaleUpController. Once we support auto-scaling, we might have to > rename > >>>> this interface. However, since I don't fully know how things will look > >>> like > >>>> with auto-scaling, I would like to refrain from this change now. > >>>> > >>>> # Relationship between declarative scheduler and existing > implementations > >>>> > >>>> The declarative scheduler will implement the SchedulerNG interface. At > >>> the > >>>> moment I am not aware of any required changes to this interface. Also > the > >>>> way the JobMaster interacts with the scheduler won't change to the > best > >>> of > >>>> my knowledge. > >>>> > >>>> # Awareness of SlotAllocator of the ExecutionGraph > >>>> > >>>> The idea is to make the SlotAllocator not aware of the ExecutionGraph > >>>> because it can only be created after the SlotAllocator has decided on > the > >>>> parallelism the job can be run with. Moreover, the idea is that the > >>>> JobInformation instance contains all the required information of a > job to > >>>> make this decision. > >>>> > >>>> The SlotAllocator also reserves the slots for a job before the > >>>> ExecutionGraph is created. Consequently, we need a way to associate > the > >>>> slots with the Executions of the ExecutionGraph. Here we have decided > to > >>>> use the ExecutionVertexID as the identificator. We could also > introduce a > >>>> new identificator as long as we can map this one to the > >>> ExecutionVertexID. > >>>> We could for example use JobVertexID and subtask id as the > identificator > >>>> but this is exactly what the ExecutionVertexID is. That's why we > decided > >>> to > >>>> reuse it for the time being. > >>>> > >>>> # ScaleUpController > >>>> > >>>> ## Cumulative parallelism > >>>> > >>>> Yes, the cumulative parallelism is the sum of all tasks. > >>>> > >>>> ## When to call the ScaleUpController > >>>> > >>>> Yes, the idea is that the scheduler will check whether one can run the > >>> job > >>>> with an increased parallelism if there are new slots available. If > this > >>> is > >>>> the case, then we will ask the ScaleUpController, whether we actually > >>>> should scale up (e.g. whether the increase is significant enough or > >>> whether > >>>> enough time has passed between the last scale up operation). > >>>> > >>>> ## Do we provide enough information to the ScaleUpController > >>>> > >>>> I am pretty sure that we don't provide the ScaleUpController enough > >>>> information to make the best possible decision. We wanted to keep it > >>> simple > >>>> in the first version and iterate on the interface with the help of > user > >>>> feedback. I think that this interface won't fundamentally change the > >>>> scheduler and, hence, it shouldn't block future extensions by starting > >>> with > >>>> a simple interface. > >>>> > >>>> # Cluster vs. job configuration > >>>> > >>>> I think you are right that it would be most flexible if one could > select > >>> a > >>>> scheduler on a per job basis with falling back to the cluster > >>> configuration > >>>> if nothing has been specified. For the sake of simplicity and > narrowing > >>>> down the scope I would consider this as a follow up. > >>>> > >>>> # Performance on failover > >>>> > >>>> I agree that the performance won't be optimal in the failover case > >>> because > >>>> of 1) having to recreate the EG and 2) redundant IO operations. For > 1) I > >>>> agree that FLINK-21110 will help a lot. For 2) moving the IO > operations > >>> out > >>>> of the EG could be a good improvement. With the same argument as > above, I > >>>> would consider this a follow up because I would like to keep the > initial > >>>> design as simple as possible and I think that these performance > >>>> optimizations are not required to make this feature work. > >>>> > >>>> # Lost execution history > >>>> > >>>> You are right Zhu Zhu that recreating the ExecutionGraph causes the > loss > >>>> of information. We are currently investigating which information is > >>>> strictly required and needs to be maintained across ExecutionGraph > >>>> creations (mainly for tests to succeed). One idea is to either store > this > >>>> information outside of the ExecutionGraph or to initialize the > >>>> ExecutionGraph with the information from the previous instance. Most > >>>> likely, we won't give a guarantee that all counters, timestamps and > >>> metrics > >>>> are correctly maintained across failovers in the first version, > though. > >>> It > >>>> is a bit the same argument as above that this is not strictly > required to > >>>> make this feature work. Maybe this means that this feature is not > >>>> production ready for some users, but I think this is ok. > >>>> > >>>> In general, to fully integrate the declarative scheduler with the web > ui > >>>> we have to be able to display changing ExecutionGraphs. Ideally, we > would > >>>> have something like a timeline where one can select the time for > which to > >>>> display the state of the job execution. If one goes all the way to the > >>>> right, the system shows the live state and all other positions are the > >>>> present time minus some offset. > >>>> > >>>> I will add the follow ups to the FLIP as potential improvements in > order > >>>> to keep track of them. > >>>> > >>>> Cheers, > >>>> Till > >>>> > >>>> On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <reed...@gmail.com> wrote: > >>>> > >>>>> Thanks for the proposal! @Till Rohrmann <trohrm...@apache.org> > >>>>> > >>>>> The design looks generally good to me. I have 2 concerns though: > >>>>> > >>>>> # performance on failover > >>>>> I can see is that an ExecutionGraph will be built and initialized on > >>> each > >>>>> task failure. The process is possibly to be slow: > >>>>> 1. the topology building can be very slow (much slower than the > >>>>> scheduling > >>>>> or restarting process) when the job scale becomes large (see > >>>>> FLINK-20612). > >>>>> Luckily FLINK-21110 will improve it. > >>>>> 2. the input/output format can be slow due to possible IO work to > >>>>> external services. > >>>>> Maybe we should extract it out from ExecutionGraph building? > >>>>> > >>>>> # execution history lost > >>>>> After building and using a new ExecutionGraph, the execution history > in > >>>>> the > >>>>> old graph will be lost, including status timestamps of the job, prior > >>>>> execution > >>>>> attempts and their failures. Should we let the new EG inherit these > >>>>> information > >>>>> from prior EG? Maybe as a future plan with further discussions > regarding > >>>>> the > >>>>> varying parallelism. > >>>>> > >>>>> Thanks, > >>>>> Zhu > >>>>> > >>>>> Xintong Song <tonysong...@gmail.com> 于2021年1月24日周日 上午11:55写道: > >>>>> > >>>>>> Thanks for preparing the FLIP and starting the discussion, Till. > >>>>>> > >>>>>> I have a few questions trying to understand the design. > >>>>>> > >>>>>> ## What is the relationship between the current and new schedulers? > >>>>>> IIUC, the new declarative scheduler will coexist with the current > >>>>>> scheduler, as an alternative that the user needs to explicitly > switch > >>> to. > >>>>>> Then does it require any changes to the scheduler interfaces and how > >>> the > >>>>>> JobMaster interacts with it? > >>>>>> > >>>>>> ## Is `SlotAllocator` aware of `ExecutionGraph`? > >>>>>> Looking at the interfaces, it seems to me that `SlotAllocator` only > >>> takes > >>>>>> `JobInformation` and `VertexInformation` as topology inputs. > However, > >>> it > >>>>>> generates `ParallelismAndResourceAssignments` which maps slots to > >>>>>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is > >>>>>> generated outside `SlotAllocator` while `ExecutionVertexID`s are > >>>>>> generated > >>>>>> inside `SlotAllocator`. > >>>>>> > >>>>>> ## About `ScaleUpController#canScaleUp` > >>>>>> > >>>>>> ### What is cumulative parallelism? > >>>>>> The interface shows that the cumulative parallelism is a single > number > >>>>>> per > >>>>>> job graph. I assume this is the sum of parallelism of all vertices? > >>>>>> > >>>>>> ### Is this interface expected to be called before or after > >>>>>> `SlotAllocator#determineParallelism`? > >>>>>> IIUC, on new resources appear, the scheduler always calls > >>>>>> `SlotAllocator#determineParallelism` to generate a new plan based on > >>> the > >>>>>> available resources, and `ScaleUpController#canScaleUp` is then > called > >>> to > >>>>>> decide whether to apply the new plan, according to whether the > >>>>>> increasement > >>>>>> is significant, how long the job has been running since last > restart, > >>>>>> etc. > >>>>>> Is that correct? > >>>>>> > >>>>>> ### The cumulative parallelism may not be enough for deciding > whether > >>> the > >>>>>> job should be scaled up. > >>>>>> I'm assuming my understanding for the above questions are correct. > >>> Please > >>>>>> correct me if not. > >>>>>> > >>>>>> I noticed Chesnay's comment on the wiki page about making the > decision > >>>>>> based on when the job entered the execution state last time. > >>>>>> > >>>>>> In addition to that, there could also be situations where jobs may > not > >>>>>> benefit much an increment in cumulative parallelism. > >>>>>> E.g., for a topology A -> B, where A and B are in different slot > >>> sharing > >>>>>> groups, and the current parallelism for both A and B are 2. When 1 > new > >>>>>> slot > >>>>>> appears, `SlotAllocator` may suggest increasing parallelism of A to > 3. > >>>>>> But > >>>>>> this may not be a significant beneficial change for the job because > the > >>>>>> overall performance is still bottlenecked by the parallelism of B. > >>>>>> > >>>>>> ## Cluster vs. Job configuration > >>>>>> I'm not entirely sure about specifying which scheduler to be used > via a > >>>>>> cluster level configuration option. Each job in a Flink cluster has > its > >>>>>> own > >>>>>> JobMaster, and which scheduler to use is internal to that > JobMaster. I > >>>>>> understand that the declarative scheduler requires the cluster to > use > >>>>>> declarative resource management. However, other jobs should still be > >>>>>> allowed to use other scheduler implementations that also support > >>>>>> declarative resource management (e.g. the current > `DefaultScheduler`). > >>>>>> Maybe we should consider the cluster level configuration option as a > >>>>>> default scheduler, and allow the job to specify a different > scheduler > >>> in > >>>>>> its execution config. This is similar to how we specify which state > >>>>>> backend > >>>>>> to be used. > >>>>>> > >>>>>> ## Minor: It might be better to also show in the state machine > figure > >>>>>> that > >>>>>> it can go from `Executing` to `Restarting` when new resources > appear. > >>>>>> > >>>>>> Thank you~ > >>>>>> > >>>>>> Xintong Song > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <stevenz...@gmail.com> > >>> wrote: > >>>>>>> Till, thanks a lot for the proposal. > >>>>>>> > >>>>>>> Even if the initial phase is only to support scale-up, maybe the > >>>>>>> "ScaleUpController" interface should be called "RescaleController" > so > >>>>>> that > >>>>>>> in the future scale-down can be added. > >>>>>>> > >>>>>>> On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann < > trohrm...@apache.org> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi everyone, > >>>>>>>> > >>>>>>>> I would like to start a discussion about adding a new type of > >>>>>> scheduler > >>>>>>> to > >>>>>>>> Flink. The declarative scheduler will first declare the required > >>>>>>> resources > >>>>>>>> and wait for them before deciding on the actual parallelism of a > >>> job. > >>>>>>>> Thereby it can better handle situations where resources cannot be > >>>>>> fully > >>>>>>>> fulfilled. Moreover, it will act as a building block for the > >>> reactive > >>>>>>> mode > >>>>>>>> where Flink should scale to the maximum of the currently available > >>>>>>>> resources. > >>>>>>>> > >>>>>>>> Please find more details in the FLIP wiki document [1]. Looking > >>>>>> forward > >>>>>>> to > >>>>>>>> your feedback. > >>>>>>>> > >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mwtRCg > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Till > >>>>>>>> > >