Thank you very much for the comments so far. @Steven:
No fixed parallelism for any of the operators > > Regarding this limitation, can the scheduler only adjust the default > parallelism? if some operators set parallelism explicitly (like always 1), > just leave them unchanged. We will respect the configured maxParallelism for that purpose. If you have an operator that is not intended to run in parallel, you can set maxParalellism=1. @Xintong: the cluster configuration option will limit us from having jobs running > with different execution modes in the same session cluster. I'm not sure if it makes sense to support reactive mode in a session cluster ever. For an autoscaling mode, it probably makes sense (as we can just combine the resource requests from all running jobs, and distribute the available resources proportional to the requested resources). I will state more clearly in the FLIP that the configuration options should be marked as experimental. Active resource managers [...] If this is the only concern, I'd like to bring the configuration option > `slotmanager.number-of-slots.max` to your attention. I understand and agree that it would be really nice to support active resource managers with the new scheduler right away. In my opinion, reactive mode will never be really supported by active resource managers, as this is a contradiction with the idea of reactive mode: It is explicitly designed to allow controlling the cluster from the outside (similar to Kafka streams, where you add and remove capacity for scaling). Integration with active resource managers should be added in a autoscaling mode, based on the declarative scheduler. I've considered the slotmanager.number-of-slots.max option as well, but it basically means that your cluster will always immediately scale up to slotmanager.number-of-slots.max and stick to that value, even if those resources are not needed. On YARN, it would be pretty difficult or even impossible to control the scale of such a Flink deployment from the outside (using a queue with the capacity scheduler won't work, as changes to queues require restarts) On K8s, one would have to build a custom tool that finds the deployment created by Flink and adjusts it. Then, it's probably easier to just create a standalone deployment on K8s. @Yang: It will be better to make the 10 seconds to be configurable. I agree that it is pretty bold to have such an important configuration parameter hardcoded. We proposed it like this to keep the first implementation as simple as possible. But if we see that basically everybody is asking for this, or if we have time left at the end of the release cycle, we'll make it configurable. but also the ScalingPolicy is not exposed to the users now Exposing the ScalingPolicy to the user is very high on our priority list, but we want to keep the first version as simple as possible, to be able to deliver the overall feature in time, and to collect some initial user feedback before coming up with an interface we want to expose. On Mon, Jan 25, 2021 at 8:37 AM Yang Wang <danrtsey...@gmail.com> wrote: > Thanks Robert for creating this FLIP and starting the discussion. > > This is a great start point to make Flink work with auto scaling service. > The reactive mode > is very useful in containerized environment(e.g. docker, Kubernetes). For > example, combined > with Kubernetes "Horizontal Pod Autoscaler"[1], the TaskManagers could be > started/released > dynamically based on the system metrics(e.g. cpu, memory) and custom > metrics(e.g. delay, latency). > > > > Once the job has started running, and a TaskManager is lost, it will wait > > for 10 seconds for the > > TaskManager to re-appear. > > It will be better to make the 10 seconds to be configurable. According to > our production experience > on Kubernetes, 10 seconds is not enough for a pod to be relaunched. Maybe > this is also a specific > case whether the resource is stable or not. > > > Active ResourceManager > > IIUC, the reason why reactive mode could not work with active resource > manager is not only > about requesting infinite amount of resources, but also the ScalingPolicy > is not exposed to the > users now. ScalingPolicy could be the bridge between reactive mode and > active resource manager. > User could have their own auto scaling service, which monitor the Flink > metrics and then update > the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active resource > manager could allocate these > TaskManagers. > But it is out the scope of this FLIP, I really expect this could be done in > the future. And it will be another > great step to make Flink auto scalable. > > > > [1]. > https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/ > > > Best, > Yang > > > Xintong Song <tonysong...@gmail.com> 于2021年1月25日周一 上午10:29写道: > > > Thanks for preparing the FLIP and starting the discussion, Robert. > > > > ## Cluster vs. Job configuration > > As I have commented on the FLIP-160 discussion thread [1], I'm a bit > unsure > > about activating the reactive execution mode via a cluster level > > configuration option. I'm aware that in the first step this feature does > > not support session clusters. However, I think that does not mean it > won't > > be supported in future. In that case, the cluster configuration option > will > > limit us from having jobs running with different execution modes in the > > same session cluster. > > > > ## Active resource managers > > According to the FLIP, this feature explicitly does not support active > > resource managers. IIUC, this is because when in this feature the job > > requests an infinite amount of resources, which would flood Kubernetes / > > Yarn / Mesos with unreasonably large number of resource requests. If this > > is the only concern, I'd like to bring the configuration option > > `slotmanager.number-of-slots.max` to your attention. This feature allows > > putting an upper limit to the total number of slots the Flink cluster > uses, > > preventing active resource managers from allocating too many resources > from > > Kubernetes / Yarn / Mesos. Unless there are other concerns that I > > overlooked, I think it would be nicer for the reactive mode to also > support > > active resource managers, with the additional requirement to explicitly > > configure the max slots. > > > > Thank you~ > > > > Xintong Song > > > > > > [1] > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html > > > > On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <stevenz...@gmail.com> wrote: > > > > > Thanks a lot for the proposal, Robert and Till. > > > > > > > No fixed parallelism for any of the operators > > > > > > Regarding this limitation, can the scheduler only adjust the default > > > parallelism? if some operators set parallelism explicitly (like always > > 1), > > > just leave them unchanged. > > > > > > > > > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <rmetz...@apache.org> > > > wrote: > > > > > > > Hi all, > > > > > > > > Till started a discussion about FLIP-160: Declarative scheduler [1] > > > earlier > > > > today, the first major feature based on that effort will be FLIP-159: > > > > Reactive Mode. It allows users to operate Flink in a way that it > > > reactively > > > > scales the job up or down depending on the provided resources: adding > > > > TaskManagers will scale the job up, removing them will scale it down > > > again. > > > > > > > > Here's the link to the Wiki: > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode > > > > > > > > We are very excited to hear your feedback about the proposal! > > > > > > > > Best, > > > > Robert > > > > > > > > [1] > > > > > > > > > > > > > > https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E > > > > > > > > > >