Thanks for the explanations, Till. Keeping the initial design as simple as possible sounds good to me. There's no further concern from my side.
Thank you~ Xintong Song On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <reed...@gmail.com> wrote: > Thanks Till for the explanation and the follow up actions. > That sounds good to me. > > Thanks, > Zhu > > Till Rohrmann <trohrm...@apache.org> 于2021年1月26日周二 下午7:28写道: > > > Thanks a lot for all your feedback. Let me try to answer it. > > > > # ScaleUpController vs. RescaleController > > > > At the moment, the idea of the declarative scheduler is to run a job with > > a parallelism which is as close to the target value as possible but to > > never exceed it. Since the target value is currently fixed (this will > > change once auto-scaling is supported), we never have to scale down. In > > fact, scale down events only happen if the system loses slots and then > you > > cannot choose whether to scale down or not. This and the idea to keep the > > initial design as simple as possible motivated the naming of > > ScaleUpController. Once we support auto-scaling, we might have to rename > > this interface. However, since I don't fully know how things will look > like > > with auto-scaling, I would like to refrain from this change now. > > > > # Relationship between declarative scheduler and existing implementations > > > > The declarative scheduler will implement the SchedulerNG interface. At > the > > moment I am not aware of any required changes to this interface. Also the > > way the JobMaster interacts with the scheduler won't change to the best > of > > my knowledge. > > > > # Awareness of SlotAllocator of the ExecutionGraph > > > > The idea is to make the SlotAllocator not aware of the ExecutionGraph > > because it can only be created after the SlotAllocator has decided on the > > parallelism the job can be run with. Moreover, the idea is that the > > JobInformation instance contains all the required information of a job to > > make this decision. > > > > The SlotAllocator also reserves the slots for a job before the > > ExecutionGraph is created. Consequently, we need a way to associate the > > slots with the Executions of the ExecutionGraph. Here we have decided to > > use the ExecutionVertexID as the identificator. We could also introduce a > > new identificator as long as we can map this one to the > ExecutionVertexID. > > We could for example use JobVertexID and subtask id as the identificator > > but this is exactly what the ExecutionVertexID is. That's why we decided > to > > reuse it for the time being. > > > > # ScaleUpController > > > > ## Cumulative parallelism > > > > Yes, the cumulative parallelism is the sum of all tasks. > > > > ## When to call the ScaleUpController > > > > Yes, the idea is that the scheduler will check whether one can run the > job > > with an increased parallelism if there are new slots available. If this > is > > the case, then we will ask the ScaleUpController, whether we actually > > should scale up (e.g. whether the increase is significant enough or > whether > > enough time has passed between the last scale up operation). > > > > ## Do we provide enough information to the ScaleUpController > > > > I am pretty sure that we don't provide the ScaleUpController enough > > information to make the best possible decision. We wanted to keep it > simple > > in the first version and iterate on the interface with the help of user > > feedback. I think that this interface won't fundamentally change the > > scheduler and, hence, it shouldn't block future extensions by starting > with > > a simple interface. > > > > # Cluster vs. job configuration > > > > I think you are right that it would be most flexible if one could select > a > > scheduler on a per job basis with falling back to the cluster > configuration > > if nothing has been specified. For the sake of simplicity and narrowing > > down the scope I would consider this as a follow up. > > > > # Performance on failover > > > > I agree that the performance won't be optimal in the failover case > because > > of 1) having to recreate the EG and 2) redundant IO operations. For 1) I > > agree that FLINK-21110 will help a lot. For 2) moving the IO operations > out > > of the EG could be a good improvement. With the same argument as above, I > > would consider this a follow up because I would like to keep the initial > > design as simple as possible and I think that these performance > > optimizations are not required to make this feature work. > > > > # Lost execution history > > > > You are right Zhu Zhu that recreating the ExecutionGraph causes the loss > > of information. We are currently investigating which information is > > strictly required and needs to be maintained across ExecutionGraph > > creations (mainly for tests to succeed). One idea is to either store this > > information outside of the ExecutionGraph or to initialize the > > ExecutionGraph with the information from the previous instance. Most > > likely, we won't give a guarantee that all counters, timestamps and > metrics > > are correctly maintained across failovers in the first version, though. > It > > is a bit the same argument as above that this is not strictly required to > > make this feature work. Maybe this means that this feature is not > > production ready for some users, but I think this is ok. > > > > In general, to fully integrate the declarative scheduler with the web ui > > we have to be able to display changing ExecutionGraphs. Ideally, we would > > have something like a timeline where one can select the time for which to > > display the state of the job execution. If one goes all the way to the > > right, the system shows the live state and all other positions are the > > present time minus some offset. > > > > I will add the follow ups to the FLIP as potential improvements in order > > to keep track of them. > > > > Cheers, > > Till > > > > On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <reed...@gmail.com> wrote: > > > >> Thanks for the proposal! @Till Rohrmann <trohrm...@apache.org> > >> > >> The design looks generally good to me. I have 2 concerns though: > >> > >> # performance on failover > >> I can see is that an ExecutionGraph will be built and initialized on > each > >> task failure. The process is possibly to be slow: > >> 1. the topology building can be very slow (much slower than the > >> scheduling > >> or restarting process) when the job scale becomes large (see > >> FLINK-20612). > >> Luckily FLINK-21110 will improve it. > >> 2. the input/output format can be slow due to possible IO work to > >> external services. > >> Maybe we should extract it out from ExecutionGraph building? > >> > >> # execution history lost > >> After building and using a new ExecutionGraph, the execution history in > >> the > >> old graph will be lost, including status timestamps of the job, prior > >> execution > >> attempts and their failures. Should we let the new EG inherit these > >> information > >> from prior EG? Maybe as a future plan with further discussions regarding > >> the > >> varying parallelism. > >> > >> Thanks, > >> Zhu > >> > >> Xintong Song <tonysong...@gmail.com> 于2021年1月24日周日 上午11:55写道: > >> > >>> Thanks for preparing the FLIP and starting the discussion, Till. > >>> > >>> I have a few questions trying to understand the design. > >>> > >>> ## What is the relationship between the current and new schedulers? > >>> IIUC, the new declarative scheduler will coexist with the current > >>> scheduler, as an alternative that the user needs to explicitly switch > to. > >>> Then does it require any changes to the scheduler interfaces and how > the > >>> JobMaster interacts with it? > >>> > >>> ## Is `SlotAllocator` aware of `ExecutionGraph`? > >>> Looking at the interfaces, it seems to me that `SlotAllocator` only > takes > >>> `JobInformation` and `VertexInformation` as topology inputs. However, > it > >>> generates `ParallelismAndResourceAssignments` which maps slots to > >>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is > >>> generated outside `SlotAllocator` while `ExecutionVertexID`s are > >>> generated > >>> inside `SlotAllocator`. > >>> > >>> ## About `ScaleUpController#canScaleUp` > >>> > >>> ### What is cumulative parallelism? > >>> The interface shows that the cumulative parallelism is a single number > >>> per > >>> job graph. I assume this is the sum of parallelism of all vertices? > >>> > >>> ### Is this interface expected to be called before or after > >>> `SlotAllocator#determineParallelism`? > >>> IIUC, on new resources appear, the scheduler always calls > >>> `SlotAllocator#determineParallelism` to generate a new plan based on > the > >>> available resources, and `ScaleUpController#canScaleUp` is then called > to > >>> decide whether to apply the new plan, according to whether the > >>> increasement > >>> is significant, how long the job has been running since last restart, > >>> etc. > >>> Is that correct? > >>> > >>> ### The cumulative parallelism may not be enough for deciding whether > the > >>> job should be scaled up. > >>> I'm assuming my understanding for the above questions are correct. > Please > >>> correct me if not. > >>> > >>> I noticed Chesnay's comment on the wiki page about making the decision > >>> based on when the job entered the execution state last time. > >>> > >>> In addition to that, there could also be situations where jobs may not > >>> benefit much an increment in cumulative parallelism. > >>> E.g., for a topology A -> B, where A and B are in different slot > sharing > >>> groups, and the current parallelism for both A and B are 2. When 1 new > >>> slot > >>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3. > >>> But > >>> this may not be a significant beneficial change for the job because the > >>> overall performance is still bottlenecked by the parallelism of B. > >>> > >>> ## Cluster vs. Job configuration > >>> I'm not entirely sure about specifying which scheduler to be used via a > >>> cluster level configuration option. Each job in a Flink cluster has its > >>> own > >>> JobMaster, and which scheduler to use is internal to that JobMaster. I > >>> understand that the declarative scheduler requires the cluster to use > >>> declarative resource management. However, other jobs should still be > >>> allowed to use other scheduler implementations that also support > >>> declarative resource management (e.g. the current `DefaultScheduler`). > >>> Maybe we should consider the cluster level configuration option as a > >>> default scheduler, and allow the job to specify a different scheduler > in > >>> its execution config. This is similar to how we specify which state > >>> backend > >>> to be used. > >>> > >>> ## Minor: It might be better to also show in the state machine figure > >>> that > >>> it can go from `Executing` to `Restarting` when new resources appear. > >>> > >>> Thank you~ > >>> > >>> Xintong Song > >>> > >>> > >>> > >>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <stevenz...@gmail.com> > wrote: > >>> > >>> > Till, thanks a lot for the proposal. > >>> > > >>> > Even if the initial phase is only to support scale-up, maybe the > >>> > "ScaleUpController" interface should be called "RescaleController" so > >>> that > >>> > in the future scale-down can be added. > >>> > > >>> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <trohrm...@apache.org> > >>> > wrote: > >>> > > >>> > > Hi everyone, > >>> > > > >>> > > I would like to start a discussion about adding a new type of > >>> scheduler > >>> > to > >>> > > Flink. The declarative scheduler will first declare the required > >>> > resources > >>> > > and wait for them before deciding on the actual parallelism of a > job. > >>> > > Thereby it can better handle situations where resources cannot be > >>> fully > >>> > > fulfilled. Moreover, it will act as a building block for the > reactive > >>> > mode > >>> > > where Flink should scale to the maximum of the currently available > >>> > > resources. > >>> > > > >>> > > Please find more details in the FLIP wiki document [1]. Looking > >>> forward > >>> > to > >>> > > your feedback. > >>> > > > >>> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg > >>> > > > >>> > > Cheers, > >>> > > Till > >>> > > > >>> > > >>> > >> >