Thanks for the explanation, Robert. Now I see how these things are expected to be supported in steps.
I think you are right. Demanding a fixed finite amount of resources can be considered as a special case of `ScalingPolicy`. I'm now good with the current scope of reactive mode as a first step, and support active resource managers with autoscaling mode after stabilizing FLIP-159/160. Thank you~ Xintong Song On Tue, Jan 26, 2021 at 5:00 PM Robert Metzger <rmetz...@apache.org> wrote: > Thanks for your thoughts Xintong! What you are writing is very valuable > feedback for me, as I have limited experience with real-world deployments. > It seems that autoscaling support is a really important follow up. > > ## active resource managers > > I guess you can consider reactive mode a special case of the more generic > autoscaling mode. Once we extend the interfaces in the declarative > scheduler to allow autoscaling mode, the scenarios you are describing are > possible. > We already had some ideas for some extended interfaces that would cover a > great variety of cases. We could allow the policy to determine the number > of desired slots, and propose a parallelism assignment based on that to the > policy. This would also work with making calls to external services > to decide the scale etc. > However implementing FLIP-159 and FLIP-160 might take quite a bit of time > to stabilize all the corner cases. Once that is done, we'll publish a FLIP > with an advanced interface for autoscaling. > > On Tue, Jan 26, 2021 at 2:56 AM Xintong Song <tonysong...@gmail.com> > wrote: > > > ## configuration option > > > > I see your point that autoscaling mode might be more suitable for session > > clusters. It doesn't change that `execution-mode` could be a job-level > > configuration. But I'm good with keeping it cluster-level and marking it > > experimental at the moment, so we can change it later if needed for the > > autoscaling mode. > > > > ## active resource managers > > > > I'm a bit confused about the boundary between reactive mode and > autoscaling > > mode. > > - Reactive mode requests an infinite amount of resources, and executes at > > the largest parallelism that is possible with the available resources. > > - Autoscaling mode dynamically adjusts resource demand, and executes at a > > parallelism that is either demanded or as large as possible if the > > demanded parallelism cannot be reached. > > - What about something in between? A job is not capable of dynamically > > adjusting the resource demand and requests a fixed finite amount of > > resources, and still wants to be executed with as large parallelisms as > > possible if the demanded parallelism cannot be reached? > > > > It's quite common that a job may temporarily not get as much resources as > > desired, due to running of other higher priority jobs in the > > Kubernetes/Yarn/Mesos cluster. In such cases, currently either the user > > needs to configure the job with a different parallelism, or the job > cannot > > be executed. It would be helpful if the job can execute with a lower > > parallelism, and automatically scales up to the original desired > > parallelism when more resources become available. > > > > > > For Yarn, there's comprehensive queue based resource quota management, > > where how many resources each job gets are closely related to other jobs' > > resource requirements. For Kubernetes, while the default kube-scheduler > > does not have such mature multi-tenant support, there are other projects > > (e.g., Apache YuniKorn [1]) that can bring the similar scheduling > > capability to Kubernetes > > > > > > Thank you~ > > > > Xintong Song > > > > > > [1] https://yunikorn.apache.org/ > > > > On Mon, Jan 25, 2021 at 4:48 PM Robert Metzger <rmetz...@apache.org> > > wrote: > > > > > Thank you very much for the comments so far. > > > > > > @Steven: > > > > > > No fixed parallelism for any of the operators > > > > > > > > Regarding this limitation, can the scheduler only adjust the default > > > > parallelism? if some operators set parallelism explicitly (like > always > > > 1), > > > > just leave them unchanged. > > > > > > > > > We will respect the configured maxParallelism for that purpose. If you > > have > > > an operator that is not intended to run in parallel, you can set > > > maxParalellism=1. > > > > > > @Xintong: > > > > > > the cluster configuration option will limit us from having jobs running > > > > with different execution modes in the same session cluster. > > > > > > > > > I'm not sure if it makes sense to support reactive mode in a session > > > cluster ever. For an autoscaling mode, it probably makes sense (as we > can > > > just combine the resource requests from all running jobs, and > distribute > > > the available resources proportional to the requested resources). > > > > > > I will state more clearly in the FLIP that the configuration options > > should > > > be marked as experimental. > > > > > > Active resource managers > > > > > > [...] > > > > > > If this is the only concern, I'd like to bring the configuration option > > > > `slotmanager.number-of-slots.max` to your attention. > > > > > > > > > I understand and agree that it would be really nice to support active > > > resource managers with the new scheduler right away. In my opinion, > > > reactive mode will never be really supported by active resource > managers, > > > as this is a contradiction with the idea of reactive mode: It is > > explicitly > > > designed to allow controlling the cluster from the outside (similar to > > > Kafka streams, where you add and remove capacity for scaling). > > Integration > > > with active resource managers should be added in a autoscaling mode, > > based > > > on the declarative scheduler. > > > I've considered the slotmanager.number-of-slots.max option as well, but > > it > > > basically means that your cluster will always immediately scale up > > > to slotmanager.number-of-slots.max and stick to that value, even if > those > > > resources are not needed. > > > On YARN, it would be pretty difficult or even impossible to control the > > > scale of such a Flink deployment from the outside (using a queue with > the > > > capacity scheduler won't work, as changes to queues require restarts) > > > On K8s, one would have to build a custom tool that finds the deployment > > > created by Flink and adjusts it. Then, it's probably easier to just > > create > > > a standalone deployment on K8s. > > > > > > @Yang: > > > > > > It will be better to make the 10 seconds to be configurable. > > > > > > > > > I agree that it is pretty bold to have such an important configuration > > > parameter hardcoded. We proposed it like this to keep the first > > > implementation as simple as possible. > > > But if we see that basically everybody is asking for this, or if we > have > > > time left at the end of the release cycle, we'll make it configurable. > > > > > > > > > but also the ScalingPolicy is not exposed to the users now > > > > > > > > > Exposing the ScalingPolicy to the user is very high on our priority > list, > > > but we want to keep the first version as simple as possible, to be able > > to > > > deliver the overall feature in time, and to collect some initial user > > > feedback before coming up with an interface we want to expose. > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jan 25, 2021 at 8:37 AM Yang Wang <danrtsey...@gmail.com> > wrote: > > > > > > > Thanks Robert for creating this FLIP and starting the discussion. > > > > > > > > This is a great start point to make Flink work with auto scaling > > service. > > > > The reactive mode > > > > is very useful in containerized environment(e.g. docker, Kubernetes). > > For > > > > example, combined > > > > with Kubernetes "Horizontal Pod Autoscaler"[1], the TaskManagers > could > > be > > > > started/released > > > > dynamically based on the system metrics(e.g. cpu, memory) and custom > > > > metrics(e.g. delay, latency). > > > > > > > > > > > > > Once the job has started running, and a TaskManager is lost, it > will > > > wait > > > > > for 10 seconds for the > > > > > > > > TaskManager to re-appear. > > > > > > > > It will be better to make the 10 seconds to be configurable. > According > > to > > > > our production experience > > > > on Kubernetes, 10 seconds is not enough for a pod to be relaunched. > > Maybe > > > > this is also a specific > > > > case whether the resource is stable or not. > > > > > > > > > Active ResourceManager > > > > > > > > IIUC, the reason why reactive mode could not work with active > resource > > > > manager is not only > > > > about requesting infinite amount of resources, but also the > > ScalingPolicy > > > > is not exposed to the > > > > users now. ScalingPolicy could be the bridge between reactive mode > and > > > > active resource manager. > > > > User could have their own auto scaling service, which monitor the > Flink > > > > metrics and then update > > > > the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active > resource > > > > manager could allocate these > > > > TaskManagers. > > > > But it is out the scope of this FLIP, I really expect this could be > > done > > > in > > > > the future. And it will be another > > > > great step to make Flink auto scalable. > > > > > > > > > > > > > > > > [1]. > > > > > > > > > > https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/ > > > > > > > > > > > > Best, > > > > Yang > > > > > > > > > > > > Xintong Song <tonysong...@gmail.com> 于2021年1月25日周一 上午10:29写道: > > > > > > > > > Thanks for preparing the FLIP and starting the discussion, Robert. > > > > > > > > > > ## Cluster vs. Job configuration > > > > > As I have commented on the FLIP-160 discussion thread [1], I'm a > bit > > > > unsure > > > > > about activating the reactive execution mode via a cluster level > > > > > configuration option. I'm aware that in the first step this feature > > > does > > > > > not support session clusters. However, I think that does not mean > it > > > > won't > > > > > be supported in future. In that case, the cluster configuration > > option > > > > will > > > > > limit us from having jobs running with different execution modes in > > the > > > > > same session cluster. > > > > > > > > > > ## Active resource managers > > > > > According to the FLIP, this feature explicitly does not support > > active > > > > > resource managers. IIUC, this is because when in this feature the > job > > > > > requests an infinite amount of resources, which would flood > > Kubernetes > > > / > > > > > Yarn / Mesos with unreasonably large number of resource requests. > If > > > this > > > > > is the only concern, I'd like to bring the configuration option > > > > > `slotmanager.number-of-slots.max` to your attention. This feature > > > allows > > > > > putting an upper limit to the total number of slots the Flink > cluster > > > > uses, > > > > > preventing active resource managers from allocating too many > > resources > > > > from > > > > > Kubernetes / Yarn / Mesos. Unless there are other concerns that I > > > > > overlooked, I think it would be nicer for the reactive mode to also > > > > support > > > > > active resource managers, with the additional requirement to > > explicitly > > > > > configure the max slots. > > > > > > > > > > Thank you~ > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html > > > > > > > > > > On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <stevenz...@gmail.com> > > > wrote: > > > > > > > > > > > Thanks a lot for the proposal, Robert and Till. > > > > > > > > > > > > > No fixed parallelism for any of the operators > > > > > > > > > > > > Regarding this limitation, can the scheduler only adjust the > > default > > > > > > parallelism? if some operators set parallelism explicitly (like > > > always > > > > > 1), > > > > > > just leave them unchanged. > > > > > > > > > > > > > > > > > > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger < > > rmetz...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > Till started a discussion about FLIP-160: Declarative scheduler > > [1] > > > > > > earlier > > > > > > > today, the first major feature based on that effort will be > > > FLIP-159: > > > > > > > Reactive Mode. It allows users to operate Flink in a way that > it > > > > > > reactively > > > > > > > scales the job up or down depending on the provided resources: > > > adding > > > > > > > TaskManagers will scale the job up, removing them will scale it > > > down > > > > > > again. > > > > > > > > > > > > > > Here's the link to the Wiki: > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode > > > > > > > > > > > > > > We are very excited to hear your feedback about the proposal! > > > > > > > > > > > > > > Best, > > > > > > > Robert > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E > > > > > > > > > > > > > > > > > > > > > > > > > > > >