The discussion has been open for 6 days, and it seems that all questions
and concerns raised so far have been addressed.

I will start a VOTE thread for FLIP-159 now.

On Tue, Jan 26, 2021 at 3:45 PM Yang Wang <danrtsey...@gmail.com> wrote:

> Thanks Robert and Till for the thorough explanation.
>
> Now I understand key difference between reactive mode and auto scaling
> mode.
> For the latter, we could dynamically adjust the desired value based on
> monitoring
> the metrics(e.g. cpu, memory, latency, delay, etc.). Since the reactive is
> simpler
> and a special case of auto scaling mode, I am also fine with the current
> scope of
> this FLIP.
>
>
> Best,
> Yang
>
> Till Rohrmann <trohrm...@apache.org> 于2021年1月26日周二 下午6:09写道:
>
> > Thanks a lot for all the feedback Steven, Yang Wang and Xintong. I have a
> > few more comments to add.
> >
> > # Keep it simple and stupid
> >
> > As Robert said we would like to keep the new feature initially as simple
> as
> > possible in order to quickly implement it. Once we have a basic
> > implementation, we want to reach out to our users to try it out and give
> us
> > feedback on the future development direction. We hope to create a better
> > feature involving our users as early as possible. That's why the proposal
> > is quite barebone.
> >
> > Yang Wang's proposal to make the timeout value configurable sounds very
> > simple to me and might already improve usability big time w/o causing a
> lot
> > of implementation effort. Hence, I think it would be a good idea to
> include
> > this feature into the initial design.
> >
> > # Cluster vs. job configuration
> >
> > I am not entirely sure whether the execution mode is a job level
> > configuration. I think about it more like a deployment/execution/run
> option
> > because one and the same job A can be executed with a fixed parallelism
> on
> > a session cluster or using the reactive mode. Unfortunately, we don't
> have
> > this kind of distinction at the moment. Consequently, the idea was to
> first
> > introduce a cluster level configuration which might be ignored or cause a
> > fatal error when being used with the wrong deployment.
> >
> > # Active resource managers
> >
> > As Robert explained, the reactive mode is not designed for active
> resource
> > managers. However, by activating only the declarative scheduler for an
> > active deployment, we should be able to run a job (streaming jobs only
> for
> > the time being) even if the active RM could not allocate all the required
> > resources as you've described Xintong.
> >
> > # Auto-scaling
> >
> > Auto-scaling, which allows Flink jobs to control its resources, will be a
> > super helpful feature. I believe that we can build auto-scaling using the
> > declarative scheduler similar to how the reactive mode uses it. The main
> > difference between auto-scaling and the reactive mode is that the Flink
> > job needs to decide on the desired number of slots. In the reactive mode
> we
> > say that the desired value is "infinity" whereas for an auto-scaled job,
> > the job is able to dynamically adjust the desired value. However,
> > auto-scaling will not be part of this FLIP.
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 26, 2021 at 10:36 AM Xintong Song <tonysong...@gmail.com>
> > wrote:
> >
> > > Thanks for the explanation, Robert.
> > >
> > > Now I see how these things are expected to be supported in steps.
> > >
> > > I think you are right. Demanding a fixed finite amount of resources can
> > be
> > > considered as a special case of `ScalingPolicy`. I'm now good with the
> > > current scope of reactive mode as a first step, and support active
> > resource
> > > managers with autoscaling mode after stabilizing FLIP-159/160.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Jan 26, 2021 at 5:00 PM Robert Metzger <rmetz...@apache.org>
> > > wrote:
> > >
> > > > Thanks for your thoughts Xintong! What you are writing is very
> valuable
> > > > feedback for me, as I have limited experience with real-world
> > > deployments.
> > > > It seems that autoscaling support is a really important follow up.
> > > >
> > > > ## active resource managers
> > > >
> > > > I guess you can consider reactive mode a special case of the more
> > generic
> > > > autoscaling mode. Once we extend the interfaces in the declarative
> > > > scheduler to allow autoscaling mode, the scenarios you are describing
> > are
> > > > possible.
> > > > We already had some ideas for some extended interfaces that would
> > cover a
> > > > great variety of cases. We could allow the policy to determine the
> > number
> > > > of desired slots, and propose a parallelism assignment based on that
> to
> > > the
> > > > policy. This would also work with making calls to external services
> > > > to decide the scale etc.
> > > > However implementing FLIP-159 and FLIP-160 might take quite a bit of
> > time
> > > > to stabilize all the corner cases. Once that is done, we'll publish a
> > > FLIP
> > > > with an advanced interface for autoscaling.
> > > >
> > > > On Tue, Jan 26, 2021 at 2:56 AM Xintong Song <tonysong...@gmail.com>
> > > > wrote:
> > > >
> > > > > ## configuration option
> > > > >
> > > > > I see your point that autoscaling mode might be more suitable for
> > > session
> > > > > clusters. It doesn't change that `execution-mode` could be a
> > job-level
> > > > > configuration. But I'm good with keeping it cluster-level and
> marking
> > > it
> > > > > experimental at the moment, so we can change it later if needed for
> > the
> > > > > autoscaling mode.
> > > > >
> > > > > ## active resource managers
> > > > >
> > > > > I'm a bit confused about the boundary between reactive mode and
> > > > autoscaling
> > > > > mode.
> > > > > - Reactive mode requests an infinite amount of resources, and
> > executes
> > > at
> > > > > the largest parallelism that is possible with the available
> > resources.
> > > > > - Autoscaling mode dynamically adjusts resource demand, and
> executes
> > > at a
> > > > > parallelism that is either demanded or as large as possible if the
> > > > > demanded parallelism cannot be reached.
> > > > > - What about something in between? A job is not capable of
> > dynamically
> > > > > adjusting the resource demand and requests a fixed finite amount of
> > > > > resources, and still wants to be executed with as large
> parallelisms
> > as
> > > > > possible if the demanded parallelism cannot be reached?
> > > > >
> > > > > It's quite common that a job may temporarily not get as much
> > resources
> > > as
> > > > > desired, due to running of other higher priority jobs in the
> > > > > Kubernetes/Yarn/Mesos cluster. In such cases, currently either the
> > user
> > > > > needs to configure the job with a different parallelism, or the job
> > > > cannot
> > > > > be executed. It would be helpful if the job can execute with a
> lower
> > > > > parallelism, and automatically scales up to the original desired
> > > > > parallelism when more resources become available.
> > > > >
> > > > >
> > > > > For Yarn, there's comprehensive queue based resource quota
> > management,
> > > > > where how many resources each job gets are closely related to other
> > > jobs'
> > > > > resource requirements. For Kubernetes, while the default
> > kube-scheduler
> > > > > does not have such mature multi-tenant support, there are other
> > > projects
> > > > > (e.g., Apache YuniKorn [1]) that can bring the similar scheduling
> > > > > capability to Kubernetes
> > > > >
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > > [1] https://yunikorn.apache.org/
> > > > >
> > > > > On Mon, Jan 25, 2021 at 4:48 PM Robert Metzger <
> rmetz...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Thank you very much for the comments so far.
> > > > > >
> > > > > > @Steven:
> > > > > >
> > > > > > No fixed parallelism for any of the operators
> > > > > > >
> > > > > > > Regarding this limitation, can the scheduler only adjust the
> > > default
> > > > > > > parallelism? if some operators set parallelism explicitly (like
> > > > always
> > > > > > 1),
> > > > > > > just leave them unchanged.
> > > > > >
> > > > > >
> > > > > > We will respect the configured maxParallelism for that purpose.
> If
> > > you
> > > > > have
> > > > > > an operator that is not intended to run in parallel, you can set
> > > > > > maxParalellism=1.
> > > > > >
> > > > > > @Xintong:
> > > > > >
> > > > > > the cluster configuration option will limit us from having jobs
> > > running
> > > > > > > with different execution modes in the same session cluster.
> > > > > >
> > > > > >
> > > > > > I'm not sure if it makes sense to support reactive mode in a
> > session
> > > > > > cluster ever. For an autoscaling mode, it probably makes sense
> (as
> > we
> > > > can
> > > > > > just combine the resource requests from all running jobs, and
> > > > distribute
> > > > > > the available resources proportional to the requested resources).
> > > > > >
> > > > > > I will state more clearly in the FLIP that the configuration
> > options
> > > > > should
> > > > > > be marked as experimental.
> > > > > >
> > > > > > Active resource managers
> > > > > >
> > > > > > [...]
> > > > > >
> > > > > > If this is the only concern, I'd like to bring the configuration
> > > option
> > > > > > > `slotmanager.number-of-slots.max` to your attention.
> > > > > >
> > > > > >
> > > > > > I understand and agree that it would be really nice to support
> > active
> > > > > > resource managers with the new scheduler right away. In my
> opinion,
> > > > > > reactive mode will never be really supported by active resource
> > > > managers,
> > > > > > as this is a contradiction with the idea of reactive mode: It is
> > > > > explicitly
> > > > > > designed to allow controlling the cluster from the outside
> (similar
> > > to
> > > > > > Kafka streams, where you add and remove capacity for scaling).
> > > > > Integration
> > > > > > with active resource managers should be added in a autoscaling
> > mode,
> > > > > based
> > > > > > on the declarative scheduler.
> > > > > > I've considered the slotmanager.number-of-slots.max option as
> well,
> > > but
> > > > > it
> > > > > > basically means that your cluster will always immediately scale
> up
> > > > > > to slotmanager.number-of-slots.max and stick to that value, even
> if
> > > > those
> > > > > > resources are not needed.
> > > > > > On YARN, it would be pretty difficult or even impossible to
> control
> > > the
> > > > > > scale of such a Flink deployment from the outside (using a queue
> > with
> > > > the
> > > > > > capacity scheduler won't work, as changes to queues require
> > restarts)
> > > > > > On K8s, one would have to build a custom tool that finds the
> > > deployment
> > > > > > created by Flink and adjusts it. Then, it's probably easier to
> just
> > > > > create
> > > > > > a standalone deployment on K8s.
> > > > > >
> > > > > > @Yang:
> > > > > >
> > > > > > It will be better to make the 10 seconds to be configurable.
> > > > > >
> > > > > >
> > > > > > I agree that it is pretty bold to have such an important
> > > configuration
> > > > > > parameter hardcoded. We proposed it like this to keep the first
> > > > > > implementation as simple as possible.
> > > > > > But if we see that basically everybody is asking for this, or if
> we
> > > > have
> > > > > > time left at the end of the release cycle, we'll make it
> > > configurable.
> > > > > >
> > > > > >
> > > > > > but also the ScalingPolicy is not exposed to the users now
> > > > > >
> > > > > >
> > > > > > Exposing the ScalingPolicy to the user is very high on our
> priority
> > > > list,
> > > > > > but we want to keep the first version as simple as possible, to
> be
> > > able
> > > > > to
> > > > > > deliver the overall feature in time, and to collect some initial
> > user
> > > > > > feedback before coming up with an interface we want to expose.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Jan 25, 2021 at 8:37 AM Yang Wang <danrtsey...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Thanks Robert for creating this FLIP and starting the
> discussion.
> > > > > > >
> > > > > > > This is a great start point to make Flink work with auto
> scaling
> > > > > service.
> > > > > > > The reactive mode
> > > > > > > is very useful in containerized environment(e.g. docker,
> > > Kubernetes).
> > > > > For
> > > > > > > example, combined
> > > > > > > with Kubernetes "Horizontal Pod Autoscaler"[1], the
> TaskManagers
> > > > could
> > > > > be
> > > > > > > started/released
> > > > > > > dynamically based on the system metrics(e.g. cpu, memory) and
> > > custom
> > > > > > > metrics(e.g. delay, latency).
> > > > > > >
> > > > > > >
> > > > > > > > Once the job has started running, and a TaskManager is lost,
> it
> > > > will
> > > > > > wait
> > > > > > > > for 10 seconds for the
> > > > > > >
> > > > > > > TaskManager to re-appear.
> > > > > > >
> > > > > > > It will be better to make the 10 seconds to be configurable.
> > > > According
> > > > > to
> > > > > > > our production experience
> > > > > > > on Kubernetes, 10 seconds is not enough for a pod to be
> > relaunched.
> > > > > Maybe
> > > > > > > this is also a specific
> > > > > > > case whether the resource is stable or not.
> > > > > > >
> > > > > > > > Active ResourceManager
> > > > > > >
> > > > > > > IIUC, the reason why reactive mode could not work with active
> > > > resource
> > > > > > > manager is not only
> > > > > > > about requesting infinite amount of resources, but also the
> > > > > ScalingPolicy
> > > > > > > is not exposed to the
> > > > > > > users now. ScalingPolicy could be the bridge between reactive
> > mode
> > > > and
> > > > > > > active resource manager.
> > > > > > > User could have their own auto scaling service, which monitor
> the
> > > > Flink
> > > > > > > metrics and then update
> > > > > > > the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active
> > > > resource
> > > > > > > manager could allocate these
> > > > > > > TaskManagers.
> > > > > > > But it is out the scope of this FLIP, I really expect this
> could
> > be
> > > > > done
> > > > > > in
> > > > > > > the future. And it will be another
> > > > > > > great step to make Flink auto scalable.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > [1].
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Yang
> > > > > > >
> > > > > > >
> > > > > > > Xintong Song <tonysong...@gmail.com> 于2021年1月25日周一 上午10:29写道:
> > > > > > >
> > > > > > > > Thanks for preparing the FLIP and starting the discussion,
> > > Robert.
> > > > > > > >
> > > > > > > > ## Cluster vs. Job configuration
> > > > > > > > As I have commented on the FLIP-160 discussion thread [1],
> I'm
> > a
> > > > bit
> > > > > > > unsure
> > > > > > > > about activating the reactive execution mode via a cluster
> > level
> > > > > > > > configuration option. I'm aware that in the first step this
> > > feature
> > > > > > does
> > > > > > > > not support session clusters. However, I think that does not
> > mean
> > > > it
> > > > > > > won't
> > > > > > > > be supported in future. In that case, the cluster
> configuration
> > > > > option
> > > > > > > will
> > > > > > > > limit us from having jobs running with different execution
> > modes
> > > in
> > > > > the
> > > > > > > > same session cluster.
> > > > > > > >
> > > > > > > > ## Active resource managers
> > > > > > > > According to the FLIP, this feature explicitly does not
> support
> > > > > active
> > > > > > > > resource managers. IIUC, this is because when in this feature
> > the
> > > > job
> > > > > > > > requests an infinite amount of resources, which would flood
> > > > > Kubernetes
> > > > > > /
> > > > > > > > Yarn / Mesos with unreasonably large number of resource
> > requests.
> > > > If
> > > > > > this
> > > > > > > > is the only concern, I'd like to bring the configuration
> option
> > > > > > > > `slotmanager.number-of-slots.max` to your attention. This
> > feature
> > > > > > allows
> > > > > > > > putting an upper limit to the total number of slots the Flink
> > > > cluster
> > > > > > > uses,
> > > > > > > > preventing active resource managers from allocating too many
> > > > > resources
> > > > > > > from
> > > > > > > > Kubernetes / Yarn / Mesos. Unless there are other concerns
> > that I
> > > > > > > > overlooked, I think it would be nicer for the reactive mode
> to
> > > also
> > > > > > > support
> > > > > > > > active resource managers, with the additional requirement to
> > > > > explicitly
> > > > > > > > configure the max slots.
> > > > > > > >
> > > > > > > > Thank you~
> > > > > > > >
> > > > > > > > Xintong Song
> > > > > > > >
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html
> > > > > > > >
> > > > > > > > On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <
> > stevenz...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks a lot for the proposal, Robert and Till.
> > > > > > > > >
> > > > > > > > > > No fixed parallelism for any of the operators
> > > > > > > > >
> > > > > > > > > Regarding this limitation, can the scheduler only adjust
> the
> > > > > default
> > > > > > > > > parallelism? if some operators set parallelism explicitly
> > (like
> > > > > > always
> > > > > > > > 1),
> > > > > > > > > just leave them unchanged.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <
> > > > > rmetz...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi all,
> > > > > > > > > >
> > > > > > > > > > Till started a discussion about FLIP-160: Declarative
> > > scheduler
> > > > > [1]
> > > > > > > > > earlier
> > > > > > > > > > today, the first major feature based on that effort will
> be
> > > > > > FLIP-159:
> > > > > > > > > > Reactive Mode. It allows users to operate Flink in a way
> > that
> > > > it
> > > > > > > > > reactively
> > > > > > > > > > scales the job up or down depending on the provided
> > > resources:
> > > > > > adding
> > > > > > > > > > TaskManagers will scale the job up, removing them will
> > scale
> > > it
> > > > > > down
> > > > > > > > > again.
> > > > > > > > > >
> > > > > > > > > > Here's the link to the Wiki:
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> > > > > > > > > >
> > > > > > > > > > We are very excited to hear your feedback about the
> > proposal!
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Robert
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to