The discussion has been open for 6 days, and it seems that all questions and concerns raised so far have been addressed.
I will start a VOTE thread for FLIP-159 now. On Tue, Jan 26, 2021 at 3:45 PM Yang Wang <danrtsey...@gmail.com> wrote: > Thanks Robert and Till for the thorough explanation. > > Now I understand key difference between reactive mode and auto scaling > mode. > For the latter, we could dynamically adjust the desired value based on > monitoring > the metrics(e.g. cpu, memory, latency, delay, etc.). Since the reactive is > simpler > and a special case of auto scaling mode, I am also fine with the current > scope of > this FLIP. > > > Best, > Yang > > Till Rohrmann <trohrm...@apache.org> 于2021年1月26日周二 下午6:09写道: > > > Thanks a lot for all the feedback Steven, Yang Wang and Xintong. I have a > > few more comments to add. > > > > # Keep it simple and stupid > > > > As Robert said we would like to keep the new feature initially as simple > as > > possible in order to quickly implement it. Once we have a basic > > implementation, we want to reach out to our users to try it out and give > us > > feedback on the future development direction. We hope to create a better > > feature involving our users as early as possible. That's why the proposal > > is quite barebone. > > > > Yang Wang's proposal to make the timeout value configurable sounds very > > simple to me and might already improve usability big time w/o causing a > lot > > of implementation effort. Hence, I think it would be a good idea to > include > > this feature into the initial design. > > > > # Cluster vs. job configuration > > > > I am not entirely sure whether the execution mode is a job level > > configuration. I think about it more like a deployment/execution/run > option > > because one and the same job A can be executed with a fixed parallelism > on > > a session cluster or using the reactive mode. Unfortunately, we don't > have > > this kind of distinction at the moment. Consequently, the idea was to > first > > introduce a cluster level configuration which might be ignored or cause a > > fatal error when being used with the wrong deployment. > > > > # Active resource managers > > > > As Robert explained, the reactive mode is not designed for active > resource > > managers. However, by activating only the declarative scheduler for an > > active deployment, we should be able to run a job (streaming jobs only > for > > the time being) even if the active RM could not allocate all the required > > resources as you've described Xintong. > > > > # Auto-scaling > > > > Auto-scaling, which allows Flink jobs to control its resources, will be a > > super helpful feature. I believe that we can build auto-scaling using the > > declarative scheduler similar to how the reactive mode uses it. The main > > difference between auto-scaling and the reactive mode is that the Flink > > job needs to decide on the desired number of slots. In the reactive mode > we > > say that the desired value is "infinity" whereas for an auto-scaled job, > > the job is able to dynamically adjust the desired value. However, > > auto-scaling will not be part of this FLIP. > > > > Cheers, > > Till > > > > On Tue, Jan 26, 2021 at 10:36 AM Xintong Song <tonysong...@gmail.com> > > wrote: > > > > > Thanks for the explanation, Robert. > > > > > > Now I see how these things are expected to be supported in steps. > > > > > > I think you are right. Demanding a fixed finite amount of resources can > > be > > > considered as a special case of `ScalingPolicy`. I'm now good with the > > > current scope of reactive mode as a first step, and support active > > resource > > > managers with autoscaling mode after stabilizing FLIP-159/160. > > > > > > Thank you~ > > > > > > Xintong Song > > > > > > > > > > > > On Tue, Jan 26, 2021 at 5:00 PM Robert Metzger <rmetz...@apache.org> > > > wrote: > > > > > > > Thanks for your thoughts Xintong! What you are writing is very > valuable > > > > feedback for me, as I have limited experience with real-world > > > deployments. > > > > It seems that autoscaling support is a really important follow up. > > > > > > > > ## active resource managers > > > > > > > > I guess you can consider reactive mode a special case of the more > > generic > > > > autoscaling mode. Once we extend the interfaces in the declarative > > > > scheduler to allow autoscaling mode, the scenarios you are describing > > are > > > > possible. > > > > We already had some ideas for some extended interfaces that would > > cover a > > > > great variety of cases. We could allow the policy to determine the > > number > > > > of desired slots, and propose a parallelism assignment based on that > to > > > the > > > > policy. This would also work with making calls to external services > > > > to decide the scale etc. > > > > However implementing FLIP-159 and FLIP-160 might take quite a bit of > > time > > > > to stabilize all the corner cases. Once that is done, we'll publish a > > > FLIP > > > > with an advanced interface for autoscaling. > > > > > > > > On Tue, Jan 26, 2021 at 2:56 AM Xintong Song <tonysong...@gmail.com> > > > > wrote: > > > > > > > > > ## configuration option > > > > > > > > > > I see your point that autoscaling mode might be more suitable for > > > session > > > > > clusters. It doesn't change that `execution-mode` could be a > > job-level > > > > > configuration. But I'm good with keeping it cluster-level and > marking > > > it > > > > > experimental at the moment, so we can change it later if needed for > > the > > > > > autoscaling mode. > > > > > > > > > > ## active resource managers > > > > > > > > > > I'm a bit confused about the boundary between reactive mode and > > > > autoscaling > > > > > mode. > > > > > - Reactive mode requests an infinite amount of resources, and > > executes > > > at > > > > > the largest parallelism that is possible with the available > > resources. > > > > > - Autoscaling mode dynamically adjusts resource demand, and > executes > > > at a > > > > > parallelism that is either demanded or as large as possible if the > > > > > demanded parallelism cannot be reached. > > > > > - What about something in between? A job is not capable of > > dynamically > > > > > adjusting the resource demand and requests a fixed finite amount of > > > > > resources, and still wants to be executed with as large > parallelisms > > as > > > > > possible if the demanded parallelism cannot be reached? > > > > > > > > > > It's quite common that a job may temporarily not get as much > > resources > > > as > > > > > desired, due to running of other higher priority jobs in the > > > > > Kubernetes/Yarn/Mesos cluster. In such cases, currently either the > > user > > > > > needs to configure the job with a different parallelism, or the job > > > > cannot > > > > > be executed. It would be helpful if the job can execute with a > lower > > > > > parallelism, and automatically scales up to the original desired > > > > > parallelism when more resources become available. > > > > > > > > > > > > > > > For Yarn, there's comprehensive queue based resource quota > > management, > > > > > where how many resources each job gets are closely related to other > > > jobs' > > > > > resource requirements. For Kubernetes, while the default > > kube-scheduler > > > > > does not have such mature multi-tenant support, there are other > > > projects > > > > > (e.g., Apache YuniKorn [1]) that can bring the similar scheduling > > > > > capability to Kubernetes > > > > > > > > > > > > > > > Thank you~ > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > [1] https://yunikorn.apache.org/ > > > > > > > > > > On Mon, Jan 25, 2021 at 4:48 PM Robert Metzger < > rmetz...@apache.org> > > > > > wrote: > > > > > > > > > > > Thank you very much for the comments so far. > > > > > > > > > > > > @Steven: > > > > > > > > > > > > No fixed parallelism for any of the operators > > > > > > > > > > > > > > Regarding this limitation, can the scheduler only adjust the > > > default > > > > > > > parallelism? if some operators set parallelism explicitly (like > > > > always > > > > > > 1), > > > > > > > just leave them unchanged. > > > > > > > > > > > > > > > > > > We will respect the configured maxParallelism for that purpose. > If > > > you > > > > > have > > > > > > an operator that is not intended to run in parallel, you can set > > > > > > maxParalellism=1. > > > > > > > > > > > > @Xintong: > > > > > > > > > > > > the cluster configuration option will limit us from having jobs > > > running > > > > > > > with different execution modes in the same session cluster. > > > > > > > > > > > > > > > > > > I'm not sure if it makes sense to support reactive mode in a > > session > > > > > > cluster ever. For an autoscaling mode, it probably makes sense > (as > > we > > > > can > > > > > > just combine the resource requests from all running jobs, and > > > > distribute > > > > > > the available resources proportional to the requested resources). > > > > > > > > > > > > I will state more clearly in the FLIP that the configuration > > options > > > > > should > > > > > > be marked as experimental. > > > > > > > > > > > > Active resource managers > > > > > > > > > > > > [...] > > > > > > > > > > > > If this is the only concern, I'd like to bring the configuration > > > option > > > > > > > `slotmanager.number-of-slots.max` to your attention. > > > > > > > > > > > > > > > > > > I understand and agree that it would be really nice to support > > active > > > > > > resource managers with the new scheduler right away. In my > opinion, > > > > > > reactive mode will never be really supported by active resource > > > > managers, > > > > > > as this is a contradiction with the idea of reactive mode: It is > > > > > explicitly > > > > > > designed to allow controlling the cluster from the outside > (similar > > > to > > > > > > Kafka streams, where you add and remove capacity for scaling). > > > > > Integration > > > > > > with active resource managers should be added in a autoscaling > > mode, > > > > > based > > > > > > on the declarative scheduler. > > > > > > I've considered the slotmanager.number-of-slots.max option as > well, > > > but > > > > > it > > > > > > basically means that your cluster will always immediately scale > up > > > > > > to slotmanager.number-of-slots.max and stick to that value, even > if > > > > those > > > > > > resources are not needed. > > > > > > On YARN, it would be pretty difficult or even impossible to > control > > > the > > > > > > scale of such a Flink deployment from the outside (using a queue > > with > > > > the > > > > > > capacity scheduler won't work, as changes to queues require > > restarts) > > > > > > On K8s, one would have to build a custom tool that finds the > > > deployment > > > > > > created by Flink and adjusts it. Then, it's probably easier to > just > > > > > create > > > > > > a standalone deployment on K8s. > > > > > > > > > > > > @Yang: > > > > > > > > > > > > It will be better to make the 10 seconds to be configurable. > > > > > > > > > > > > > > > > > > I agree that it is pretty bold to have such an important > > > configuration > > > > > > parameter hardcoded. We proposed it like this to keep the first > > > > > > implementation as simple as possible. > > > > > > But if we see that basically everybody is asking for this, or if > we > > > > have > > > > > > time left at the end of the release cycle, we'll make it > > > configurable. > > > > > > > > > > > > > > > > > > but also the ScalingPolicy is not exposed to the users now > > > > > > > > > > > > > > > > > > Exposing the ScalingPolicy to the user is very high on our > priority > > > > list, > > > > > > but we want to keep the first version as simple as possible, to > be > > > able > > > > > to > > > > > > deliver the overall feature in time, and to collect some initial > > user > > > > > > feedback before coming up with an interface we want to expose. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jan 25, 2021 at 8:37 AM Yang Wang <danrtsey...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > Thanks Robert for creating this FLIP and starting the > discussion. > > > > > > > > > > > > > > This is a great start point to make Flink work with auto > scaling > > > > > service. > > > > > > > The reactive mode > > > > > > > is very useful in containerized environment(e.g. docker, > > > Kubernetes). > > > > > For > > > > > > > example, combined > > > > > > > with Kubernetes "Horizontal Pod Autoscaler"[1], the > TaskManagers > > > > could > > > > > be > > > > > > > started/released > > > > > > > dynamically based on the system metrics(e.g. cpu, memory) and > > > custom > > > > > > > metrics(e.g. delay, latency). > > > > > > > > > > > > > > > > > > > > > > Once the job has started running, and a TaskManager is lost, > it > > > > will > > > > > > wait > > > > > > > > for 10 seconds for the > > > > > > > > > > > > > > TaskManager to re-appear. > > > > > > > > > > > > > > It will be better to make the 10 seconds to be configurable. > > > > According > > > > > to > > > > > > > our production experience > > > > > > > on Kubernetes, 10 seconds is not enough for a pod to be > > relaunched. > > > > > Maybe > > > > > > > this is also a specific > > > > > > > case whether the resource is stable or not. > > > > > > > > > > > > > > > Active ResourceManager > > > > > > > > > > > > > > IIUC, the reason why reactive mode could not work with active > > > > resource > > > > > > > manager is not only > > > > > > > about requesting infinite amount of resources, but also the > > > > > ScalingPolicy > > > > > > > is not exposed to the > > > > > > > users now. ScalingPolicy could be the bridge between reactive > > mode > > > > and > > > > > > > active resource manager. > > > > > > > User could have their own auto scaling service, which monitor > the > > > > Flink > > > > > > > metrics and then update > > > > > > > the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active > > > > resource > > > > > > > manager could allocate these > > > > > > > TaskManagers. > > > > > > > But it is out the scope of this FLIP, I really expect this > could > > be > > > > > done > > > > > > in > > > > > > > the future. And it will be another > > > > > > > great step to make Flink auto scalable. > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1]. > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/ > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > Yang > > > > > > > > > > > > > > > > > > > > > Xintong Song <tonysong...@gmail.com> 于2021年1月25日周一 上午10:29写道: > > > > > > > > > > > > > > > Thanks for preparing the FLIP and starting the discussion, > > > Robert. > > > > > > > > > > > > > > > > ## Cluster vs. Job configuration > > > > > > > > As I have commented on the FLIP-160 discussion thread [1], > I'm > > a > > > > bit > > > > > > > unsure > > > > > > > > about activating the reactive execution mode via a cluster > > level > > > > > > > > configuration option. I'm aware that in the first step this > > > feature > > > > > > does > > > > > > > > not support session clusters. However, I think that does not > > mean > > > > it > > > > > > > won't > > > > > > > > be supported in future. In that case, the cluster > configuration > > > > > option > > > > > > > will > > > > > > > > limit us from having jobs running with different execution > > modes > > > in > > > > > the > > > > > > > > same session cluster. > > > > > > > > > > > > > > > > ## Active resource managers > > > > > > > > According to the FLIP, this feature explicitly does not > support > > > > > active > > > > > > > > resource managers. IIUC, this is because when in this feature > > the > > > > job > > > > > > > > requests an infinite amount of resources, which would flood > > > > > Kubernetes > > > > > > / > > > > > > > > Yarn / Mesos with unreasonably large number of resource > > requests. > > > > If > > > > > > this > > > > > > > > is the only concern, I'd like to bring the configuration > option > > > > > > > > `slotmanager.number-of-slots.max` to your attention. This > > feature > > > > > > allows > > > > > > > > putting an upper limit to the total number of slots the Flink > > > > cluster > > > > > > > uses, > > > > > > > > preventing active resource managers from allocating too many > > > > > resources > > > > > > > from > > > > > > > > Kubernetes / Yarn / Mesos. Unless there are other concerns > > that I > > > > > > > > overlooked, I think it would be nicer for the reactive mode > to > > > also > > > > > > > support > > > > > > > > active resource managers, with the additional requirement to > > > > > explicitly > > > > > > > > configure the max slots. > > > > > > > > > > > > > > > > Thank you~ > > > > > > > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html > > > > > > > > > > > > > > > > On Sat, Jan 23, 2021 at 5:59 AM Steven Wu < > > stevenz...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > Thanks a lot for the proposal, Robert and Till. > > > > > > > > > > > > > > > > > > > No fixed parallelism for any of the operators > > > > > > > > > > > > > > > > > > Regarding this limitation, can the scheduler only adjust > the > > > > > default > > > > > > > > > parallelism? if some operators set parallelism explicitly > > (like > > > > > > always > > > > > > > > 1), > > > > > > > > > just leave them unchanged. > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger < > > > > > rmetz...@apache.org> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > Till started a discussion about FLIP-160: Declarative > > > scheduler > > > > > [1] > > > > > > > > > earlier > > > > > > > > > > today, the first major feature based on that effort will > be > > > > > > FLIP-159: > > > > > > > > > > Reactive Mode. It allows users to operate Flink in a way > > that > > > > it > > > > > > > > > reactively > > > > > > > > > > scales the job up or down depending on the provided > > > resources: > > > > > > adding > > > > > > > > > > TaskManagers will scale the job up, removing them will > > scale > > > it > > > > > > down > > > > > > > > > again. > > > > > > > > > > > > > > > > > > > > Here's the link to the Wiki: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode > > > > > > > > > > > > > > > > > > > > We are very excited to hear your feedback about the > > proposal! > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Robert > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >