Thanks for preparing the FLIP and driving the discussion, Till. All of my questions have already been answered in the previous discussion.
I just have one minor reminder regarding using ExecutionVertexID as the identificator. The JobVertexID is generated according to the topology instead of generated randomly. Thus, it's not guaranteed to be unique across different jobs and different execution. This characteristic is also inherited by the ExecutionVertexID. UUIC, the ParallelismAndResourceAssignments is a job-level concept so it will work well. Best, Yangze Guo On Wed, Jan 27, 2021 at 10:37 AM Xintong Song <tonysong...@gmail.com> wrote: > > Thanks for the explanations, Till. > Keeping the initial design as simple as possible sounds good to me. > There's no further concern from my side. > > Thank you~ > > Xintong Song > > > > On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <reed...@gmail.com> wrote: > > > Thanks Till for the explanation and the follow up actions. > > That sounds good to me. > > > > Thanks, > > Zhu > > > > Till Rohrmann <trohrm...@apache.org> 于2021年1月26日周二 下午7:28写道: > > > > > Thanks a lot for all your feedback. Let me try to answer it. > > > > > > # ScaleUpController vs. RescaleController > > > > > > At the moment, the idea of the declarative scheduler is to run a job with > > > a parallelism which is as close to the target value as possible but to > > > never exceed it. Since the target value is currently fixed (this will > > > change once auto-scaling is supported), we never have to scale down. In > > > fact, scale down events only happen if the system loses slots and then > > you > > > cannot choose whether to scale down or not. This and the idea to keep the > > > initial design as simple as possible motivated the naming of > > > ScaleUpController. Once we support auto-scaling, we might have to rename > > > this interface. However, since I don't fully know how things will look > > like > > > with auto-scaling, I would like to refrain from this change now. > > > > > > # Relationship between declarative scheduler and existing implementations > > > > > > The declarative scheduler will implement the SchedulerNG interface. At > > the > > > moment I am not aware of any required changes to this interface. Also the > > > way the JobMaster interacts with the scheduler won't change to the best > > of > > > my knowledge. > > > > > > # Awareness of SlotAllocator of the ExecutionGraph > > > > > > The idea is to make the SlotAllocator not aware of the ExecutionGraph > > > because it can only be created after the SlotAllocator has decided on the > > > parallelism the job can be run with. Moreover, the idea is that the > > > JobInformation instance contains all the required information of a job to > > > make this decision. > > > > > > The SlotAllocator also reserves the slots for a job before the > > > ExecutionGraph is created. Consequently, we need a way to associate the > > > slots with the Executions of the ExecutionGraph. Here we have decided to > > > use the ExecutionVertexID as the identificator. We could also introduce a > > > new identificator as long as we can map this one to the > > ExecutionVertexID. > > > We could for example use JobVertexID and subtask id as the identificator > > > but this is exactly what the ExecutionVertexID is. That's why we decided > > to > > > reuse it for the time being. > > > > > > # ScaleUpController > > > > > > ## Cumulative parallelism > > > > > > Yes, the cumulative parallelism is the sum of all tasks. > > > > > > ## When to call the ScaleUpController > > > > > > Yes, the idea is that the scheduler will check whether one can run the > > job > > > with an increased parallelism if there are new slots available. If this > > is > > > the case, then we will ask the ScaleUpController, whether we actually > > > should scale up (e.g. whether the increase is significant enough or > > whether > > > enough time has passed between the last scale up operation). > > > > > > ## Do we provide enough information to the ScaleUpController > > > > > > I am pretty sure that we don't provide the ScaleUpController enough > > > information to make the best possible decision. We wanted to keep it > > simple > > > in the first version and iterate on the interface with the help of user > > > feedback. I think that this interface won't fundamentally change the > > > scheduler and, hence, it shouldn't block future extensions by starting > > with > > > a simple interface. > > > > > > # Cluster vs. job configuration > > > > > > I think you are right that it would be most flexible if one could select > > a > > > scheduler on a per job basis with falling back to the cluster > > configuration > > > if nothing has been specified. For the sake of simplicity and narrowing > > > down the scope I would consider this as a follow up. > > > > > > # Performance on failover > > > > > > I agree that the performance won't be optimal in the failover case > > because > > > of 1) having to recreate the EG and 2) redundant IO operations. For 1) I > > > agree that FLINK-21110 will help a lot. For 2) moving the IO operations > > out > > > of the EG could be a good improvement. With the same argument as above, I > > > would consider this a follow up because I would like to keep the initial > > > design as simple as possible and I think that these performance > > > optimizations are not required to make this feature work. > > > > > > # Lost execution history > > > > > > You are right Zhu Zhu that recreating the ExecutionGraph causes the loss > > > of information. We are currently investigating which information is > > > strictly required and needs to be maintained across ExecutionGraph > > > creations (mainly for tests to succeed). One idea is to either store this > > > information outside of the ExecutionGraph or to initialize the > > > ExecutionGraph with the information from the previous instance. Most > > > likely, we won't give a guarantee that all counters, timestamps and > > metrics > > > are correctly maintained across failovers in the first version, though. > > It > > > is a bit the same argument as above that this is not strictly required to > > > make this feature work. Maybe this means that this feature is not > > > production ready for some users, but I think this is ok. > > > > > > In general, to fully integrate the declarative scheduler with the web ui > > > we have to be able to display changing ExecutionGraphs. Ideally, we would > > > have something like a timeline where one can select the time for which to > > > display the state of the job execution. If one goes all the way to the > > > right, the system shows the live state and all other positions are the > > > present time minus some offset. > > > > > > I will add the follow ups to the FLIP as potential improvements in order > > > to keep track of them. > > > > > > Cheers, > > > Till > > > > > > On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <reed...@gmail.com> wrote: > > > > > >> Thanks for the proposal! @Till Rohrmann <trohrm...@apache.org> > > >> > > >> The design looks generally good to me. I have 2 concerns though: > > >> > > >> # performance on failover > > >> I can see is that an ExecutionGraph will be built and initialized on > > each > > >> task failure. The process is possibly to be slow: > > >> 1. the topology building can be very slow (much slower than the > > >> scheduling > > >> or restarting process) when the job scale becomes large (see > > >> FLINK-20612). > > >> Luckily FLINK-21110 will improve it. > > >> 2. the input/output format can be slow due to possible IO work to > > >> external services. > > >> Maybe we should extract it out from ExecutionGraph building? > > >> > > >> # execution history lost > > >> After building and using a new ExecutionGraph, the execution history in > > >> the > > >> old graph will be lost, including status timestamps of the job, prior > > >> execution > > >> attempts and their failures. Should we let the new EG inherit these > > >> information > > >> from prior EG? Maybe as a future plan with further discussions regarding > > >> the > > >> varying parallelism. > > >> > > >> Thanks, > > >> Zhu > > >> > > >> Xintong Song <tonysong...@gmail.com> 于2021年1月24日周日 上午11:55写道: > > >> > > >>> Thanks for preparing the FLIP and starting the discussion, Till. > > >>> > > >>> I have a few questions trying to understand the design. > > >>> > > >>> ## What is the relationship between the current and new schedulers? > > >>> IIUC, the new declarative scheduler will coexist with the current > > >>> scheduler, as an alternative that the user needs to explicitly switch > > to. > > >>> Then does it require any changes to the scheduler interfaces and how > > the > > >>> JobMaster interacts with it? > > >>> > > >>> ## Is `SlotAllocator` aware of `ExecutionGraph`? > > >>> Looking at the interfaces, it seems to me that `SlotAllocator` only > > takes > > >>> `JobInformation` and `VertexInformation` as topology inputs. However, > > it > > >>> generates `ParallelismAndResourceAssignments` which maps slots to > > >>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is > > >>> generated outside `SlotAllocator` while `ExecutionVertexID`s are > > >>> generated > > >>> inside `SlotAllocator`. > > >>> > > >>> ## About `ScaleUpController#canScaleUp` > > >>> > > >>> ### What is cumulative parallelism? > > >>> The interface shows that the cumulative parallelism is a single number > > >>> per > > >>> job graph. I assume this is the sum of parallelism of all vertices? > > >>> > > >>> ### Is this interface expected to be called before or after > > >>> `SlotAllocator#determineParallelism`? > > >>> IIUC, on new resources appear, the scheduler always calls > > >>> `SlotAllocator#determineParallelism` to generate a new plan based on > > the > > >>> available resources, and `ScaleUpController#canScaleUp` is then called > > to > > >>> decide whether to apply the new plan, according to whether the > > >>> increasement > > >>> is significant, how long the job has been running since last restart, > > >>> etc. > > >>> Is that correct? > > >>> > > >>> ### The cumulative parallelism may not be enough for deciding whether > > the > > >>> job should be scaled up. > > >>> I'm assuming my understanding for the above questions are correct. > > Please > > >>> correct me if not. > > >>> > > >>> I noticed Chesnay's comment on the wiki page about making the decision > > >>> based on when the job entered the execution state last time. > > >>> > > >>> In addition to that, there could also be situations where jobs may not > > >>> benefit much an increment in cumulative parallelism. > > >>> E.g., for a topology A -> B, where A and B are in different slot > > sharing > > >>> groups, and the current parallelism for both A and B are 2. When 1 new > > >>> slot > > >>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3. > > >>> But > > >>> this may not be a significant beneficial change for the job because the > > >>> overall performance is still bottlenecked by the parallelism of B. > > >>> > > >>> ## Cluster vs. Job configuration > > >>> I'm not entirely sure about specifying which scheduler to be used via a > > >>> cluster level configuration option. Each job in a Flink cluster has its > > >>> own > > >>> JobMaster, and which scheduler to use is internal to that JobMaster. I > > >>> understand that the declarative scheduler requires the cluster to use > > >>> declarative resource management. However, other jobs should still be > > >>> allowed to use other scheduler implementations that also support > > >>> declarative resource management (e.g. the current `DefaultScheduler`). > > >>> Maybe we should consider the cluster level configuration option as a > > >>> default scheduler, and allow the job to specify a different scheduler > > in > > >>> its execution config. This is similar to how we specify which state > > >>> backend > > >>> to be used. > > >>> > > >>> ## Minor: It might be better to also show in the state machine figure > > >>> that > > >>> it can go from `Executing` to `Restarting` when new resources appear. > > >>> > > >>> Thank you~ > > >>> > > >>> Xintong Song > > >>> > > >>> > > >>> > > >>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <stevenz...@gmail.com> > > wrote: > > >>> > > >>> > Till, thanks a lot for the proposal. > > >>> > > > >>> > Even if the initial phase is only to support scale-up, maybe the > > >>> > "ScaleUpController" interface should be called "RescaleController" so > > >>> that > > >>> > in the future scale-down can be added. > > >>> > > > >>> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <trohrm...@apache.org> > > >>> > wrote: > > >>> > > > >>> > > Hi everyone, > > >>> > > > > >>> > > I would like to start a discussion about adding a new type of > > >>> scheduler > > >>> > to > > >>> > > Flink. The declarative scheduler will first declare the required > > >>> > resources > > >>> > > and wait for them before deciding on the actual parallelism of a > > job. > > >>> > > Thereby it can better handle situations where resources cannot be > > >>> fully > > >>> > > fulfilled. Moreover, it will act as a building block for the > > reactive > > >>> > mode > > >>> > > where Flink should scale to the maximum of the currently available > > >>> > > resources. > > >>> > > > > >>> > > Please find more details in the FLIP wiki document [1]. Looking > > >>> forward > > >>> > to > > >>> > > your feedback. > > >>> > > > > >>> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg > > >>> > > > > >>> > > Cheers, > > >>> > > Till > > >>> > > > > >>> > > > >>> > > >> > >