Thanks for the proposal! @Till Rohrmann <trohrm...@apache.org> The design looks generally good to me. I have 2 concerns though:
# performance on failover I can see is that an ExecutionGraph will be built and initialized on each task failure. The process is possibly to be slow: 1. the topology building can be very slow (much slower than the scheduling or restarting process) when the job scale becomes large (see FLINK-20612). Luckily FLINK-21110 will improve it. 2. the input/output format can be slow due to possible IO work to external services. Maybe we should extract it out from ExecutionGraph building? # execution history lost After building and using a new ExecutionGraph, the execution history in the old graph will be lost, including status timestamps of the job, prior execution attempts and their failures. Should we let the new EG inherit these information from prior EG? Maybe as a future plan with further discussions regarding the varying parallelism. Thanks, Zhu Xintong Song <tonysong...@gmail.com> 于2021年1月24日周日 上午11:55写道: > Thanks for preparing the FLIP and starting the discussion, Till. > > I have a few questions trying to understand the design. > > ## What is the relationship between the current and new schedulers? > IIUC, the new declarative scheduler will coexist with the current > scheduler, as an alternative that the user needs to explicitly switch to. > Then does it require any changes to the scheduler interfaces and how the > JobMaster interacts with it? > > ## Is `SlotAllocator` aware of `ExecutionGraph`? > Looking at the interfaces, it seems to me that `SlotAllocator` only takes > `JobInformation` and `VertexInformation` as topology inputs. However, it > generates `ParallelismAndResourceAssignments` which maps slots to > `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is > generated outside `SlotAllocator` while `ExecutionVertexID`s are generated > inside `SlotAllocator`. > > ## About `ScaleUpController#canScaleUp` > > ### What is cumulative parallelism? > The interface shows that the cumulative parallelism is a single number per > job graph. I assume this is the sum of parallelism of all vertices? > > ### Is this interface expected to be called before or after > `SlotAllocator#determineParallelism`? > IIUC, on new resources appear, the scheduler always calls > `SlotAllocator#determineParallelism` to generate a new plan based on the > available resources, and `ScaleUpController#canScaleUp` is then called to > decide whether to apply the new plan, according to whether the increasement > is significant, how long the job has been running since last restart, etc. > Is that correct? > > ### The cumulative parallelism may not be enough for deciding whether the > job should be scaled up. > I'm assuming my understanding for the above questions are correct. Please > correct me if not. > > I noticed Chesnay's comment on the wiki page about making the decision > based on when the job entered the execution state last time. > > In addition to that, there could also be situations where jobs may not > benefit much an increment in cumulative parallelism. > E.g., for a topology A -> B, where A and B are in different slot sharing > groups, and the current parallelism for both A and B are 2. When 1 new slot > appears, `SlotAllocator` may suggest increasing parallelism of A to 3. But > this may not be a significant beneficial change for the job because the > overall performance is still bottlenecked by the parallelism of B. > > ## Cluster vs. Job configuration > I'm not entirely sure about specifying which scheduler to be used via a > cluster level configuration option. Each job in a Flink cluster has its own > JobMaster, and which scheduler to use is internal to that JobMaster. I > understand that the declarative scheduler requires the cluster to use > declarative resource management. However, other jobs should still be > allowed to use other scheduler implementations that also support > declarative resource management (e.g. the current `DefaultScheduler`). > Maybe we should consider the cluster level configuration option as a > default scheduler, and allow the job to specify a different scheduler in > its execution config. This is similar to how we specify which state backend > to be used. > > ## Minor: It might be better to also show in the state machine figure that > it can go from `Executing` to `Restarting` when new resources appear. > > Thank you~ > > Xintong Song > > > > On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <stevenz...@gmail.com> wrote: > > > Till, thanks a lot for the proposal. > > > > Even if the initial phase is only to support scale-up, maybe the > > "ScaleUpController" interface should be called "RescaleController" so > that > > in the future scale-down can be added. > > > > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <trohrm...@apache.org> > > wrote: > > > > > Hi everyone, > > > > > > I would like to start a discussion about adding a new type of scheduler > > to > > > Flink. The declarative scheduler will first declare the required > > resources > > > and wait for them before deciding on the actual parallelism of a job. > > > Thereby it can better handle situations where resources cannot be fully > > > fulfilled. Moreover, it will act as a building block for the reactive > > mode > > > where Flink should scale to the maximum of the currently available > > > resources. > > > > > > Please find more details in the FLIP wiki document [1]. Looking forward > > to > > > your feedback. > > > > > > [1] https://cwiki.apache.org/confluence/x/mwtRCg > > > > > > Cheers, > > > Till > > > > > >