Thanks for preparing the FLIP and starting the discussion, Till. I have a few questions trying to understand the design.
## What is the relationship between the current and new schedulers? IIUC, the new declarative scheduler will coexist with the current scheduler, as an alternative that the user needs to explicitly switch to. Then does it require any changes to the scheduler interfaces and how the JobMaster interacts with it? ## Is `SlotAllocator` aware of `ExecutionGraph`? Looking at the interfaces, it seems to me that `SlotAllocator` only takes `JobInformation` and `VertexInformation` as topology inputs. However, it generates `ParallelismAndResourceAssignments` which maps slots to `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is generated outside `SlotAllocator` while `ExecutionVertexID`s are generated inside `SlotAllocator`. ## About `ScaleUpController#canScaleUp` ### What is cumulative parallelism? The interface shows that the cumulative parallelism is a single number per job graph. I assume this is the sum of parallelism of all vertices? ### Is this interface expected to be called before or after `SlotAllocator#determineParallelism`? IIUC, on new resources appear, the scheduler always calls `SlotAllocator#determineParallelism` to generate a new plan based on the available resources, and `ScaleUpController#canScaleUp` is then called to decide whether to apply the new plan, according to whether the increasement is significant, how long the job has been running since last restart, etc. Is that correct? ### The cumulative parallelism may not be enough for deciding whether the job should be scaled up. I'm assuming my understanding for the above questions are correct. Please correct me if not. I noticed Chesnay's comment on the wiki page about making the decision based on when the job entered the execution state last time. In addition to that, there could also be situations where jobs may not benefit much an increment in cumulative parallelism. E.g., for a topology A -> B, where A and B are in different slot sharing groups, and the current parallelism for both A and B are 2. When 1 new slot appears, `SlotAllocator` may suggest increasing parallelism of A to 3. But this may not be a significant beneficial change for the job because the overall performance is still bottlenecked by the parallelism of B. ## Cluster vs. Job configuration I'm not entirely sure about specifying which scheduler to be used via a cluster level configuration option. Each job in a Flink cluster has its own JobMaster, and which scheduler to use is internal to that JobMaster. I understand that the declarative scheduler requires the cluster to use declarative resource management. However, other jobs should still be allowed to use other scheduler implementations that also support declarative resource management (e.g. the current `DefaultScheduler`). Maybe we should consider the cluster level configuration option as a default scheduler, and allow the job to specify a different scheduler in its execution config. This is similar to how we specify which state backend to be used. ## Minor: It might be better to also show in the state machine figure that it can go from `Executing` to `Restarting` when new resources appear. Thank you~ Xintong Song On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <stevenz...@gmail.com> wrote: > Till, thanks a lot for the proposal. > > Even if the initial phase is only to support scale-up, maybe the > "ScaleUpController" interface should be called "RescaleController" so that > in the future scale-down can be added. > > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <trohrm...@apache.org> > wrote: > > > Hi everyone, > > > > I would like to start a discussion about adding a new type of scheduler > to > > Flink. The declarative scheduler will first declare the required > resources > > and wait for them before deciding on the actual parallelism of a job. > > Thereby it can better handle situations where resources cannot be fully > > fulfilled. Moreover, it will act as a building block for the reactive > mode > > where Flink should scale to the maximum of the currently available > > resources. > > > > Please find more details in the FLIP wiki document [1]. Looking forward > to > > your feedback. > > > > [1] https://cwiki.apache.org/confluence/x/mwtRCg > > > > Cheers, > > Till > > >