Thanks for the explanation, Robert.

Now I see how these things are expected to be supported in steps.

I think you are right. Demanding a fixed finite amount of resources can be
considered as a special case of `ScalingPolicy`. I'm now good with the
current scope of reactive mode as a first step, and support active resource
managers with autoscaling mode after stabilizing FLIP-159/160.

Thank you~

Xintong Song



On Tue, Jan 26, 2021 at 5:00 PM Robert Metzger <rmetz...@apache.org> wrote:

> Thanks for your thoughts Xintong! What you are writing is very valuable
> feedback for me, as I have limited experience with real-world deployments.
> It seems that autoscaling support is a really important follow up.
>
> ## active resource managers
>
> I guess you can consider reactive mode a special case of the more generic
> autoscaling mode. Once we extend the interfaces in the declarative
> scheduler to allow autoscaling mode, the scenarios you are describing are
> possible.
> We already had some ideas for some extended interfaces that would cover a
> great variety of cases. We could allow the policy to determine the number
> of desired slots, and propose a parallelism assignment based on that to the
> policy. This would also work with making calls to external services
> to decide the scale etc.
> However implementing FLIP-159 and FLIP-160 might take quite a bit of time
> to stabilize all the corner cases. Once that is done, we'll publish a FLIP
> with an advanced interface for autoscaling.
>
> On Tue, Jan 26, 2021 at 2:56 AM Xintong Song <tonysong...@gmail.com>
> wrote:
>
> > ## configuration option
> >
> > I see your point that autoscaling mode might be more suitable for session
> > clusters. It doesn't change that `execution-mode` could be a job-level
> > configuration. But I'm good with keeping it cluster-level and marking it
> > experimental at the moment, so we can change it later if needed for the
> > autoscaling mode.
> >
> > ## active resource managers
> >
> > I'm a bit confused about the boundary between reactive mode and
> autoscaling
> > mode.
> > - Reactive mode requests an infinite amount of resources, and executes at
> > the largest parallelism that is possible with the available resources.
> > - Autoscaling mode dynamically adjusts resource demand, and executes at a
> > parallelism that is either demanded or as large as possible if the
> > demanded parallelism cannot be reached.
> > - What about something in between? A job is not capable of dynamically
> > adjusting the resource demand and requests a fixed finite amount of
> > resources, and still wants to be executed with as large parallelisms as
> > possible if the demanded parallelism cannot be reached?
> >
> > It's quite common that a job may temporarily not get as much resources as
> > desired, due to running of other higher priority jobs in the
> > Kubernetes/Yarn/Mesos cluster. In such cases, currently either the user
> > needs to configure the job with a different parallelism, or the job
> cannot
> > be executed. It would be helpful if the job can execute with a lower
> > parallelism, and automatically scales up to the original desired
> > parallelism when more resources become available.
> >
> >
> > For Yarn, there's comprehensive queue based resource quota management,
> > where how many resources each job gets are closely related to other jobs'
> > resource requirements. For Kubernetes, while the default kube-scheduler
> > does not have such mature multi-tenant support, there are other projects
> > (e.g., Apache YuniKorn [1]) that can bring the similar scheduling
> > capability to Kubernetes
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> > [1] https://yunikorn.apache.org/
> >
> > On Mon, Jan 25, 2021 at 4:48 PM Robert Metzger <rmetz...@apache.org>
> > wrote:
> >
> > > Thank you very much for the comments so far.
> > >
> > > @Steven:
> > >
> > > No fixed parallelism for any of the operators
> > > >
> > > > Regarding this limitation, can the scheduler only adjust the default
> > > > parallelism? if some operators set parallelism explicitly (like
> always
> > > 1),
> > > > just leave them unchanged.
> > >
> > >
> > > We will respect the configured maxParallelism for that purpose. If you
> > have
> > > an operator that is not intended to run in parallel, you can set
> > > maxParalellism=1.
> > >
> > > @Xintong:
> > >
> > > the cluster configuration option will limit us from having jobs running
> > > > with different execution modes in the same session cluster.
> > >
> > >
> > > I'm not sure if it makes sense to support reactive mode in a session
> > > cluster ever. For an autoscaling mode, it probably makes sense (as we
> can
> > > just combine the resource requests from all running jobs, and
> distribute
> > > the available resources proportional to the requested resources).
> > >
> > > I will state more clearly in the FLIP that the configuration options
> > should
> > > be marked as experimental.
> > >
> > > Active resource managers
> > >
> > > [...]
> > >
> > > If this is the only concern, I'd like to bring the configuration option
> > > > `slotmanager.number-of-slots.max` to your attention.
> > >
> > >
> > > I understand and agree that it would be really nice to support active
> > > resource managers with the new scheduler right away. In my opinion,
> > > reactive mode will never be really supported by active resource
> managers,
> > > as this is a contradiction with the idea of reactive mode: It is
> > explicitly
> > > designed to allow controlling the cluster from the outside (similar to
> > > Kafka streams, where you add and remove capacity for scaling).
> > Integration
> > > with active resource managers should be added in a autoscaling mode,
> > based
> > > on the declarative scheduler.
> > > I've considered the slotmanager.number-of-slots.max option as well, but
> > it
> > > basically means that your cluster will always immediately scale up
> > > to slotmanager.number-of-slots.max and stick to that value, even if
> those
> > > resources are not needed.
> > > On YARN, it would be pretty difficult or even impossible to control the
> > > scale of such a Flink deployment from the outside (using a queue with
> the
> > > capacity scheduler won't work, as changes to queues require restarts)
> > > On K8s, one would have to build a custom tool that finds the deployment
> > > created by Flink and adjusts it. Then, it's probably easier to just
> > create
> > > a standalone deployment on K8s.
> > >
> > > @Yang:
> > >
> > > It will be better to make the 10 seconds to be configurable.
> > >
> > >
> > > I agree that it is pretty bold to have such an important configuration
> > > parameter hardcoded. We proposed it like this to keep the first
> > > implementation as simple as possible.
> > > But if we see that basically everybody is asking for this, or if we
> have
> > > time left at the end of the release cycle, we'll make it configurable.
> > >
> > >
> > > but also the ScalingPolicy is not exposed to the users now
> > >
> > >
> > > Exposing the ScalingPolicy to the user is very high on our priority
> list,
> > > but we want to keep the first version as simple as possible, to be able
> > to
> > > deliver the overall feature in time, and to collect some initial user
> > > feedback before coming up with an interface we want to expose.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Jan 25, 2021 at 8:37 AM Yang Wang <danrtsey...@gmail.com>
> wrote:
> > >
> > > > Thanks Robert for creating this FLIP and starting the discussion.
> > > >
> > > > This is a great start point to make Flink work with auto scaling
> > service.
> > > > The reactive mode
> > > > is very useful in containerized environment(e.g. docker, Kubernetes).
> > For
> > > > example, combined
> > > > with Kubernetes "Horizontal Pod Autoscaler"[1], the TaskManagers
> could
> > be
> > > > started/released
> > > > dynamically based on the system metrics(e.g. cpu, memory) and custom
> > > > metrics(e.g. delay, latency).
> > > >
> > > >
> > > > > Once the job has started running, and a TaskManager is lost, it
> will
> > > wait
> > > > > for 10 seconds for the
> > > >
> > > > TaskManager to re-appear.
> > > >
> > > > It will be better to make the 10 seconds to be configurable.
> According
> > to
> > > > our production experience
> > > > on Kubernetes, 10 seconds is not enough for a pod to be relaunched.
> > Maybe
> > > > this is also a specific
> > > > case whether the resource is stable or not.
> > > >
> > > > > Active ResourceManager
> > > >
> > > > IIUC, the reason why reactive mode could not work with active
> resource
> > > > manager is not only
> > > > about requesting infinite amount of resources, but also the
> > ScalingPolicy
> > > > is not exposed to the
> > > > users now. ScalingPolicy could be the bridge between reactive mode
> and
> > > > active resource manager.
> > > > User could have their own auto scaling service, which monitor the
> Flink
> > > > metrics and then update
> > > > the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active
> resource
> > > > manager could allocate these
> > > > TaskManagers.
> > > > But it is out the scope of this FLIP, I really expect this could be
> > done
> > > in
> > > > the future. And it will be another
> > > > great step to make Flink auto scalable.
> > > >
> > > >
> > > >
> > > > [1].
> > > >
> > >
> >
> https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > >
> > > > Xintong Song <tonysong...@gmail.com> 于2021年1月25日周一 上午10:29写道:
> > > >
> > > > > Thanks for preparing the FLIP and starting the discussion, Robert.
> > > > >
> > > > > ## Cluster vs. Job configuration
> > > > > As I have commented on the FLIP-160 discussion thread [1], I'm a
> bit
> > > > unsure
> > > > > about activating the reactive execution mode via a cluster level
> > > > > configuration option. I'm aware that in the first step this feature
> > > does
> > > > > not support session clusters. However, I think that does not mean
> it
> > > > won't
> > > > > be supported in future. In that case, the cluster configuration
> > option
> > > > will
> > > > > limit us from having jobs running with different execution modes in
> > the
> > > > > same session cluster.
> > > > >
> > > > > ## Active resource managers
> > > > > According to the FLIP, this feature explicitly does not support
> > active
> > > > > resource managers. IIUC, this is because when in this feature the
> job
> > > > > requests an infinite amount of resources, which would flood
> > Kubernetes
> > > /
> > > > > Yarn / Mesos with unreasonably large number of resource requests.
> If
> > > this
> > > > > is the only concern, I'd like to bring the configuration option
> > > > > `slotmanager.number-of-slots.max` to your attention. This feature
> > > allows
> > > > > putting an upper limit to the total number of slots the Flink
> cluster
> > > > uses,
> > > > > preventing active resource managers from allocating too many
> > resources
> > > > from
> > > > > Kubernetes / Yarn / Mesos. Unless there are other concerns that I
> > > > > overlooked, I think it would be nicer for the reactive mode to also
> > > > support
> > > > > active resource managers, with the additional requirement to
> > explicitly
> > > > > configure the max slots.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html
> > > > >
> > > > > On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <stevenz...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks a lot for the proposal, Robert and Till.
> > > > > >
> > > > > > > No fixed parallelism for any of the operators
> > > > > >
> > > > > > Regarding this limitation, can the scheduler only adjust the
> > default
> > > > > > parallelism? if some operators set parallelism explicitly (like
> > > always
> > > > > 1),
> > > > > > just leave them unchanged.
> > > > > >
> > > > > >
> > > > > > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <
> > rmetz...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > Till started a discussion about FLIP-160: Declarative scheduler
> > [1]
> > > > > > earlier
> > > > > > > today, the first major feature based on that effort will be
> > > FLIP-159:
> > > > > > > Reactive Mode. It allows users to operate Flink in a way that
> it
> > > > > > reactively
> > > > > > > scales the job up or down depending on the provided resources:
> > > adding
> > > > > > > TaskManagers will scale the job up, removing them will scale it
> > > down
> > > > > > again.
> > > > > > >
> > > > > > > Here's the link to the Wiki:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> > > > > > >
> > > > > > > We are very excited to hear your feedback about the proposal!
> > > > > > >
> > > > > > > Best,
> > > > > > > Robert
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to