Thanks Till for the explanation and the follow up actions. That sounds good to me.
Thanks, Zhu Till Rohrmann <trohrm...@apache.org> 于2021年1月26日周二 下午7:28写道: > Thanks a lot for all your feedback. Let me try to answer it. > > # ScaleUpController vs. RescaleController > > At the moment, the idea of the declarative scheduler is to run a job with > a parallelism which is as close to the target value as possible but to > never exceed it. Since the target value is currently fixed (this will > change once auto-scaling is supported), we never have to scale down. In > fact, scale down events only happen if the system loses slots and then you > cannot choose whether to scale down or not. This and the idea to keep the > initial design as simple as possible motivated the naming of > ScaleUpController. Once we support auto-scaling, we might have to rename > this interface. However, since I don't fully know how things will look like > with auto-scaling, I would like to refrain from this change now. > > # Relationship between declarative scheduler and existing implementations > > The declarative scheduler will implement the SchedulerNG interface. At the > moment I am not aware of any required changes to this interface. Also the > way the JobMaster interacts with the scheduler won't change to the best of > my knowledge. > > # Awareness of SlotAllocator of the ExecutionGraph > > The idea is to make the SlotAllocator not aware of the ExecutionGraph > because it can only be created after the SlotAllocator has decided on the > parallelism the job can be run with. Moreover, the idea is that the > JobInformation instance contains all the required information of a job to > make this decision. > > The SlotAllocator also reserves the slots for a job before the > ExecutionGraph is created. Consequently, we need a way to associate the > slots with the Executions of the ExecutionGraph. Here we have decided to > use the ExecutionVertexID as the identificator. We could also introduce a > new identificator as long as we can map this one to the ExecutionVertexID. > We could for example use JobVertexID and subtask id as the identificator > but this is exactly what the ExecutionVertexID is. That's why we decided to > reuse it for the time being. > > # ScaleUpController > > ## Cumulative parallelism > > Yes, the cumulative parallelism is the sum of all tasks. > > ## When to call the ScaleUpController > > Yes, the idea is that the scheduler will check whether one can run the job > with an increased parallelism if there are new slots available. If this is > the case, then we will ask the ScaleUpController, whether we actually > should scale up (e.g. whether the increase is significant enough or whether > enough time has passed between the last scale up operation). > > ## Do we provide enough information to the ScaleUpController > > I am pretty sure that we don't provide the ScaleUpController enough > information to make the best possible decision. We wanted to keep it simple > in the first version and iterate on the interface with the help of user > feedback. I think that this interface won't fundamentally change the > scheduler and, hence, it shouldn't block future extensions by starting with > a simple interface. > > # Cluster vs. job configuration > > I think you are right that it would be most flexible if one could select a > scheduler on a per job basis with falling back to the cluster configuration > if nothing has been specified. For the sake of simplicity and narrowing > down the scope I would consider this as a follow up. > > # Performance on failover > > I agree that the performance won't be optimal in the failover case because > of 1) having to recreate the EG and 2) redundant IO operations. For 1) I > agree that FLINK-21110 will help a lot. For 2) moving the IO operations out > of the EG could be a good improvement. With the same argument as above, I > would consider this a follow up because I would like to keep the initial > design as simple as possible and I think that these performance > optimizations are not required to make this feature work. > > # Lost execution history > > You are right Zhu Zhu that recreating the ExecutionGraph causes the loss > of information. We are currently investigating which information is > strictly required and needs to be maintained across ExecutionGraph > creations (mainly for tests to succeed). One idea is to either store this > information outside of the ExecutionGraph or to initialize the > ExecutionGraph with the information from the previous instance. Most > likely, we won't give a guarantee that all counters, timestamps and metrics > are correctly maintained across failovers in the first version, though. It > is a bit the same argument as above that this is not strictly required to > make this feature work. Maybe this means that this feature is not > production ready for some users, but I think this is ok. > > In general, to fully integrate the declarative scheduler with the web ui > we have to be able to display changing ExecutionGraphs. Ideally, we would > have something like a timeline where one can select the time for which to > display the state of the job execution. If one goes all the way to the > right, the system shows the live state and all other positions are the > present time minus some offset. > > I will add the follow ups to the FLIP as potential improvements in order > to keep track of them. > > Cheers, > Till > > On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <reed...@gmail.com> wrote: > >> Thanks for the proposal! @Till Rohrmann <trohrm...@apache.org> >> >> The design looks generally good to me. I have 2 concerns though: >> >> # performance on failover >> I can see is that an ExecutionGraph will be built and initialized on each >> task failure. The process is possibly to be slow: >> 1. the topology building can be very slow (much slower than the >> scheduling >> or restarting process) when the job scale becomes large (see >> FLINK-20612). >> Luckily FLINK-21110 will improve it. >> 2. the input/output format can be slow due to possible IO work to >> external services. >> Maybe we should extract it out from ExecutionGraph building? >> >> # execution history lost >> After building and using a new ExecutionGraph, the execution history in >> the >> old graph will be lost, including status timestamps of the job, prior >> execution >> attempts and their failures. Should we let the new EG inherit these >> information >> from prior EG? Maybe as a future plan with further discussions regarding >> the >> varying parallelism. >> >> Thanks, >> Zhu >> >> Xintong Song <tonysong...@gmail.com> 于2021年1月24日周日 上午11:55写道: >> >>> Thanks for preparing the FLIP and starting the discussion, Till. >>> >>> I have a few questions trying to understand the design. >>> >>> ## What is the relationship between the current and new schedulers? >>> IIUC, the new declarative scheduler will coexist with the current >>> scheduler, as an alternative that the user needs to explicitly switch to. >>> Then does it require any changes to the scheduler interfaces and how the >>> JobMaster interacts with it? >>> >>> ## Is `SlotAllocator` aware of `ExecutionGraph`? >>> Looking at the interfaces, it seems to me that `SlotAllocator` only takes >>> `JobInformation` and `VertexInformation` as topology inputs. However, it >>> generates `ParallelismAndResourceAssignments` which maps slots to >>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is >>> generated outside `SlotAllocator` while `ExecutionVertexID`s are >>> generated >>> inside `SlotAllocator`. >>> >>> ## About `ScaleUpController#canScaleUp` >>> >>> ### What is cumulative parallelism? >>> The interface shows that the cumulative parallelism is a single number >>> per >>> job graph. I assume this is the sum of parallelism of all vertices? >>> >>> ### Is this interface expected to be called before or after >>> `SlotAllocator#determineParallelism`? >>> IIUC, on new resources appear, the scheduler always calls >>> `SlotAllocator#determineParallelism` to generate a new plan based on the >>> available resources, and `ScaleUpController#canScaleUp` is then called to >>> decide whether to apply the new plan, according to whether the >>> increasement >>> is significant, how long the job has been running since last restart, >>> etc. >>> Is that correct? >>> >>> ### The cumulative parallelism may not be enough for deciding whether the >>> job should be scaled up. >>> I'm assuming my understanding for the above questions are correct. Please >>> correct me if not. >>> >>> I noticed Chesnay's comment on the wiki page about making the decision >>> based on when the job entered the execution state last time. >>> >>> In addition to that, there could also be situations where jobs may not >>> benefit much an increment in cumulative parallelism. >>> E.g., for a topology A -> B, where A and B are in different slot sharing >>> groups, and the current parallelism for both A and B are 2. When 1 new >>> slot >>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3. >>> But >>> this may not be a significant beneficial change for the job because the >>> overall performance is still bottlenecked by the parallelism of B. >>> >>> ## Cluster vs. Job configuration >>> I'm not entirely sure about specifying which scheduler to be used via a >>> cluster level configuration option. Each job in a Flink cluster has its >>> own >>> JobMaster, and which scheduler to use is internal to that JobMaster. I >>> understand that the declarative scheduler requires the cluster to use >>> declarative resource management. However, other jobs should still be >>> allowed to use other scheduler implementations that also support >>> declarative resource management (e.g. the current `DefaultScheduler`). >>> Maybe we should consider the cluster level configuration option as a >>> default scheduler, and allow the job to specify a different scheduler in >>> its execution config. This is similar to how we specify which state >>> backend >>> to be used. >>> >>> ## Minor: It might be better to also show in the state machine figure >>> that >>> it can go from `Executing` to `Restarting` when new resources appear. >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <stevenz...@gmail.com> wrote: >>> >>> > Till, thanks a lot for the proposal. >>> > >>> > Even if the initial phase is only to support scale-up, maybe the >>> > "ScaleUpController" interface should be called "RescaleController" so >>> that >>> > in the future scale-down can be added. >>> > >>> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <trohrm...@apache.org> >>> > wrote: >>> > >>> > > Hi everyone, >>> > > >>> > > I would like to start a discussion about adding a new type of >>> scheduler >>> > to >>> > > Flink. The declarative scheduler will first declare the required >>> > resources >>> > > and wait for them before deciding on the actual parallelism of a job. >>> > > Thereby it can better handle situations where resources cannot be >>> fully >>> > > fulfilled. Moreover, it will act as a building block for the reactive >>> > mode >>> > > where Flink should scale to the maximum of the currently available >>> > > resources. >>> > > >>> > > Please find more details in the FLIP wiki document [1]. Looking >>> forward >>> > to >>> > > your feedback. >>> > > >>> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg >>> > > >>> > > Cheers, >>> > > Till >>> > > >>> > >>> >>