Hi Thomas,

Yes, I was referring to a separate repository under Apache Flink.

Cheers,

Konstantin

On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise <t...@apache.org> wrote:

> Hi everyone,
>
> Thanks for the feedback and discussion. A few additional thoughts:
>
> [Konstantin] > With respect to common lifecycle management operations:
> these features are
> > not available (within Apache Flink) for any of the other resource
> providers
> > (YARN, Standalone) either. From this perspective, I wouldn't consider
> this
> > a shortcoming of the Kubernetes integration.
>
> I think time and evolution of the ecosystem are factors to consider as
> well. The state and usage of Flink was much different when YARN
> integration was novel. Expectations are different today and the
> lifecycle functionality provided by an operator may as well be
> considered essential to support the concept of a Flink application on
> k8s. After few years learning from operator experience outside of
> Flink it might be a good time to fill the gap.
>
> [Konstantin] > I still believe that we should keep this focus on low
> > level composable building blocks (like Jobs and Snapshots) in Apache
> Flink
> > to make it easy for everyone to build fitting higher level abstractions
> > like a FlinkApplication Custom Resource on top of it.
>
> I completely agree that it is important that the basic functions of
> Flink are solid and continued focus is necessary. Thanks for sharing
> the pointers, these are great improvements. At the same time,
> ecosystem, contributor base and user spectrum are growing. There have
> been significant additions in many areas of Flink including connectors
> and higher level abstractions like statefun, SQL and Python. It's also
> evident from additional repositories/subprojects that we have in Flink
> today.
>
> [Konstantin] > Having said this, if others in the community have the
> capacity to push and
> > *maintain* a somewhat minimal "reference" Kubernetes Operator for Apache
> > Flink, I don't see any blockers. If or when this happens, I'd see some
> > clear benefits of using a separate repository (easier independent
> > versioning and releases, different build system & tooling (go, I
> assume)).
>
> Naturally different contributors to the project have different focus.
> Let's find out if there is strong enough interest to take this on and
> strong enough commitment to maintain. As I see it, there is a
> tremendous amount of internal investment going into operationalizing
> Flink within many companies. Improvements to the operational side of
> Flink like the operator would complement Flink nicely. I assume that
> you are referring to a separate repository within Apache Flink, which
> would give it the chance to achieve better sustainability than the
> existing external operator efforts. There is also the fact that some
> organizations which are heavily invested in operationalizing Flink are
> allowing contributing to Apache Flink itself but less so to arbitrary
> github projects. Regarding the tooling, it could well turn out that
> Java is a good alternative given the ecosystem focus and that there is
> an opportunity for reuse in certain aspects (metrics, logging etc.).
>
> [Yang] > I think Xintong has given a strong point why we introduced
> the native K8s integration, which is active resource management.
> > I have a concrete example for this in the production. When a K8s node is
> down, the standalone K8s deployment will take longer
> > recovery time based on the K8s eviction time(IIRC, default is 5
> minutes). For the native K8s integration, Flink RM could be aware of the
> > TM heartbeat lost and allocate a new one timely.
>
> Thanks for sharing this, we should evaluate it as part of a proposal.
> If we can optimize recovery or scaling with active resource management
> then perhaps it is worth to support it through the operator.
> Previously mentioned operators all rely on the standalone model.
>
> Cheers,
> Thomas
>
> On Wed, Jan 12, 2022 at 3:21 AM Konstantin Knauf <kna...@apache.org>
> wrote:
> >
> > cc dev@
> >
> > Hi Thomas, Hi everyone,
> >
> > Thank you for starting this discussion and sorry for chiming in late.
> >
> > I agree with Thomas' and David's assessment of Flink's "Native Kubernetes
> > Integration", in particular, it does actually not integrate well with the
> > Kubernetes ecosystem despite being called "native" (tooling, security
> > concerns).
> >
> > With respect to common lifecycle management operations: these features
> are
> > not available (within Apache Flink) for any of the other resource
> providers
> > (YARN, Standalone) either. From this perspective, I wouldn't consider
> this
> > a shortcoming of the Kubernetes integration. Instead, we have been
> focusing
> > our efforts in Apache Flink on the operations of a single Job, and left
> > orchestration and lifecycle management that spans multiple Jobs to
> > ecosystem projects. I still believe that we should keep this focus on low
> > level composable building blocks (like Jobs and Snapshots) in Apache
> Flink
> > to make it easy for everyone to build fitting higher level abstractions
> > like a FlinkApplication Custom Resource on top of it. For example, we are
> > currently contributing multiple improvements [1,2,3,4] to the REST API
> and
> > Application Mode that in our experience will make it easier to manage
> > Apache Flink with a Kubernetes operator. Given this background, I
> suspect a
> > Kubernetes Operator in Apache Flink would not be a priority for us at
> > Ververica - at least right now.
> >
> > Having said this, if others in the community have the capacity to push
> and
> > *maintain* a somewhat minimal "reference" Kubernetes Operator for Apache
> > Flink, I don't see any blockers. If or when this happens, I'd see some
> > clear benefits of using a separate repository (easier independent
> > versioning and releases, different build system & tooling (go, I
> assume)).
> >
> > Looking forward to your thoughts,
> >
> > Konstantin
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-24275
> > [2] https://issues.apache.org/jira/browse/FLINK-24208
> > [3]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
> > [4] https://issues.apache.org/jira/browse/FLINK-24113
> >
> > On Mon, Jan 10, 2022 at 2:11 PM Gyula Fóra <gyf...@apache.org> wrote:
> >
> > > Hi All!
> > >
> > > This is a very interesting discussion.
> > >
> > > I think many users find it confusing what deployment mode to choose
> when
> > > considering a new production application on Kubernetes. With all the
> > > options of native, standalone and different operators this can get
> tricky :)
> > >
> > > I really like the idea that Thomas brought up to have at least a
> minimal
> > > operator implementation in Flink itself to cover the most common
> production
> > > job lifecycle management scenarios. I think the Flink community has a
> very
> > > strong experience in this area to create a successful implementation
> that
> > > would benefit most production users on Kubernetes.
> > >
> > > Cheers,
> > > Gyula
> > >
> > > On Mon, Jan 10, 2022 at 4:29 AM Yang Wang <danrtsey...@gmail.com>
> wrote:
> > >
> > >> Thanks all for this fruitful discussion.
> > >>
> > >> I think Xintong has given a strong point why we introduced the native
> K8s
> > >> integration, which is active resource management.
> > >> I have a concrete example for this in the production. When a K8s node
> is
> > >> down, the standalone K8s deployment will take longer
> > >> recovery time based on the K8s eviction time(IIRC, default is 5
> minutes).
> > >> For the native K8s integration, Flink RM could be aware of the
> > >> TM heartbeat lost and allocate a new one timely.
> > >>
> > >> Also when introducing the native K8s integration, another hit is that
> we
> > >> should make the users are easy enough to migrate from YARN deployment.
> > >> They already have a production-ready job life-cycle management system,
> > >> which is using Flink CLI to submit the Flink jobs.
> > >> So we provide a consistent command "bin/flink run-application -t
> > >> kubernetes-application/yarn-application" to start a Flink application
> and
> > >> "bin/flink cancel/stop ..."
> > >> to terminate a Flink application.
> > >>
> > >>
> > >> Compared with K8s operator, I know that this is not a K8s
> > >> native mechanism. Hence, I also agree that we still need a powerful
> K8s
> > >> operator which
> > >> could work with both standalone and native K8s modes. The major
> > >> difference between them is how to start the JM and TM pods. For
> standalone,
> > >> they are managed by K8s job/deployment. For native, maybe we could
> simply
> > >> create a submission carrying the "flink run-application" arguments
> > >> which is derived from the Flink application CR.
> > >>
> > >> Make the Flink's active resource manager can talk to the K8s operator
> is
> > >> an interesting option, which could support both standalone and native.
> > >> Then Flink RM just needs to declare the resource requirement(e.g. 2 *
> > >> <2G, 1CPU>, 2 * <4G, 1CPU>) and defer the resource
> allocation/de-allocation
> > >> to the K8s operator. It feels like an intermediate form between native
> > >> and standalone mode :)
> > >>
> > >>
> > >>
> > >> Best,
> > >> Yang
> > >>
> > >>
> > >>
> > >> Xintong Song <tonysong...@gmail.com> 于2022年1月7日周五 12:02写道:
> > >>
> > >>> Hi folks,
> > >>>
> > >>> Thanks for the discussion. I'd like to share my two cents on this
> topic.
> > >>>
> > >>> Firstly, I'd like to clarify my understanding of the concepts "native
> > >>> k8s integration" and "active resource management".
> > >>> - Native k8s integration means Flink's master interacts with k8s' api
> > >>> server directly. It acts like embedding an operator inside Flink's
> master,
> > >>> which manages the resources (pod, deployment, configmap, etc.) and
> watches
> > >>> / reacts to related events.
> > >>> - Active resource management means Flink can actively start /
> terminate
> > >>> workers as needed. Its key characteristic is that the resource a
> Flink
> > >>> deployment uses is decided by the job's execution plan, unlike the
> opposite
> > >>> reactive mode (resource available to the deployment decides the
> execution
> > >>> plan) or the standalone mode (both execution plan and deployment
> resources
> > >>> are predefined).
> > >>>
> > >>> Currently, we have the yarn and native k8s deployments (and the
> recently
> > >>> removed mesos deployment) in active mode, due to their ability to
> request /
> > >>> release worker resources from the underlying cluster. And all the
> existing
> > >>> operators, AFAIK, work with a Flink standalone deployment, where
> Flink
> > >>> cannot request / release resources by itself.
> > >>>
> > >>> From this perspective, I think a large part of the native k8s
> > >>> integration advantages come from the active mode: being able to
> better
> > >>> understand the job's resource requirements and adjust the deployment
> > >>> resource accordingly. Both fine-grained resource management
> (customizing TM
> > >>> resources for different tasks / operators) and adaptive batch
> scheduler
> > >>> (rescale the deployment w.r.t. different stages) fall into this
> category.
> > >>>
> > >>> I'm wondering if we can have an operator that also works with the
> active
> > >>> mode. Instead of talking to the api server directly for adding /
> deleting
> > >>> resources, Flink's active resource manager can talk to the operator
> (via
> > >>> CR) about the resources the deployment needs, and let the operator to
> > >>> actually add / remove the resources. The operator should be able to
> work
> > >>> with (active) or without (standalone) the information of deployment's
> > >>> resource requirements. In this way, users are free to choose between
> active
> > >>> and reactive (e.g., HPA) rescaling, while always benefiting from the
> > >>> beyond-deployment lifecycle (upgrades, savepoint management, etc.)
> and
> > >>> alignment with the K8s ecosystem (Flink client free, operating via
> kubectl,
> > >>> etc.).
> > >>>
> > >>> Thank you~
> > >>>
> > >>> Xintong Song
> > >>>
> > >>>
> > >>>
> > >>> On Thu, Jan 6, 2022 at 1:06 PM Thomas Weise <t...@apache.org> wrote:
> > >>>
> > >>>> Hi David,
> > >>>>
> > >>>> Thank you for the reply and context!
> > >>>>
> > >>>> As for workload types and where native integration might fit: I
> think
> > >>>> that any k8s native solution that satisfies category 3) can also
> take
> > >>>> care of 1) and 2) while the native integration by itself can't
> achieve
> > >>>> that. Existence of [1] might serve as further indication.
> > >>>>
> > >>>> The k8s operator pattern would be an essential building block for a
> > >>>> k8s native solution that is interoperable with k8s ecosystem tooling
> > >>>> like kubectl, which is why [2] and subsequent derived art were
> > >>>> created. Specifically the CRD allows us to directly express the
> > >>>> concept of a Flink application consisting of job manager and task
> > >>>> manager pods along with associated create/update/delete operations.
> > >>>>
> > >>>> Would it make sense to gauge interest to have such an operator as
> part
> > >>>> of Flink? It appears so from discussions like [3]. I think such
> > >>>> addition would significantly lower the barrier to adoption, since
> like
> > >>>> you mentioned one cannot really run mission critical streaming
> > >>>> workloads with just the Apache Flink release binaries alone. While
> it
> > >>>> is great to have multiple k8s operators to choose from that are
> > >>>> managed outside Flink, it is unfortunately also evident that today's
> > >>>> hot operator turns into tomorrow's tech debt. I think such fate
> would
> > >>>> be less likely within the project, when multiple parties can join
> > >>>> forces and benefit from each other's contributions. There were
> similar
> > >>>> considerations and discussions around Docker images in the past.
> > >>>>
> > >>>> Out of the features that you listed it is particularly the
> application
> > >>>> upgrade that needs to be solved through an external process like
> > >>>> operator. The good thing is that many folks have already thought
> hard
> > >>>> about this and in existing implementations we see different
> strategies
> > >>>> that have their merit and production mileage (certainly applies to
> > >>>> [2]). We could combine the best of these ideas into a unified
> > >>>> implementation as part of Flink itself as starting point.
> > >>>>
> > >>>> Cheers,
> > >>>> Thomas
> > >>>>
> > >>>>
> > >>>> [1] https://github.com/wangyang0918/flink-native-k8s-operator
> > >>>> [2] https://github.com/lyft/flinkk8soperator
> > >>>> [3]
> https://lists.apache.org/thread/fhcr5gj1txcr0fo4s821jkp6d4tk6080
> > >>>>
> > >>>>
> > >>>> On Tue, Jan 4, 2022 at 4:04 AM David Morávek <d...@apache.org>
> wrote:
> > >>>> >
> > >>>> > Hi Thomas,
> > >>>> >
> > >>>> > AFAIK there are no specific plans in this direction with the
> native
> > >>>> integration, but I'd like to share some thoughts on the topic
> > >>>> >
> > >>>> > In my understanding there are three major groups of workloads in
> > >>>> Flink:
> > >>>> >
> > >>>> > 1) Batch workloads
> > >>>> > 2) Interactive workloads (Both Batch and Streaming; eg. SQL
> Gateway /
> > >>>> Zeppelin Notebooks / VVP ...)
> > >>>> > 3) "Mission Critical" Streaming workloads
> > >>>> >
> > >>>> > I think the native integration fits really well in the first two
> > >>>> categories. Let's talk about these first:
> > >>>> >
> > >>>> > 1) Batch workloads
> > >>>> >
> > >>>> > You don't really need to address the upgrade story here. The
> > >>>> interesting topic is how to "dynamically" adjust parallelism as the
> > >>>> workload can change between stages. This is where the Adaptive Batch
> > >>>> Scheduler [1] comes into play. To leverage the scheduler to the full
> > >>>> extend, it needs to be deployed with the remote shuffle service in
> place
> > >>>> [2], so the Flink's Resource Manager can free TaskManagers that are
> no
> > >>>> longer needed.
> > >>>> >
> > >>>> > This can IMO work really well with the native integration as
> there is
> > >>>> really clear approach on how the Resource Manager should decide on
> what
> > >>>> resources should be allocated.
> > >>>> >
> > >>>> > 2) Interactive workloads
> > >>>> >
> > >>>> > Again, the upgrade story is not really interesting in this
> scenario.
> > >>>> For batch workloads, it's basically the same as the above. For
> streaming
> > >>>> one this gets tricky. The main initiative that we current have in
> terms of
> > >>>> auto scaling / re-scaling of the streaming workloads is the
> reactive mode
> > >>>> (adaptive scheduler) [3].
> > >>>> >
> > >>>> > I can totally see how the reactive mode could be integrated in the
> > >>>> native integration, but with the application mode, which is not
> really
> > >>>> suitable for the interactive workloads. For integration with session
> > >>>> cluster, we'd first need to address the "scheduling" problem of how
> to
> > >>>> distribute newly available resources between multiple jobs.
> > >>>> >
> > >>>> > What's pretty neat here is that the interpreter (zeppelin, sql gw,
> > >>>> ...) have a really convenient way of spinning up a new cluster
> inside k8s.
> > >>>> >
> > >>>> > 3) "Mission Critical" Streaming workloads
> > >>>> >
> > >>>> > This one is IMO the primary reason why one would consider
> building a
> > >>>> new operator these days as this needs a careful lifecycle
> management of the
> > >>>> pipeline. I assume this is also the use case that you're
> investigating, am
> > >>>> I correct?
> > >>>> >
> > >>>> > I'd second the requirements that you've already stated:
> > >>>> > a) Resource efficiency - being able to re-scale based on the
> > >>>> workload, in order to keep up with the input / not waste resources
> > >>>> > b) Fast recovery
> > >>>> > c) Application upgrades
> > >>>> >
> > >>>> > I personally don't think that the native integration is really
> > >>>> suitable here. The direction that we're headed is with the
> standalone
> > >>>> deployment on Kubernetes + the reactive mode (adaptive scheduler).
> > >>>> >
> > >>>> > In theory, if we want to build a really cloud (Kubernetes) native
> > >>>> stream processor, deploying the pipeline should be as simple as
> deploying
> > >>>> any other application. It should be also simple to integrate with
> CI & CD
> > >>>> environment and the fast / frequent deploy philosophy.
> > >>>> >
> > >>>> > Let's see where we stand and where we can expand from there:
> > >>>> >
> > >>>> > a) Resource efficiency
> > >>>> >
> > >>>> > We already have the reactive mode in place. This allows you to
> add /
> > >>>> remove task managers by adjusting the TM deployment (`kubectl scale
> ...`)
> > >>>> and Flink will automatically react to the available resources. This
> is
> > >>>> currently only supported with the Application Mode, that is limited
> to a
> > >>>> single job (which should be enough for this kind of workload).
> > >>>> >
> > >>>> > The re-scaling logic is left completely up to the user and can be
> as
> > >>>> simple as setting up a HPA (Horizontal Pod Autoscaler). I tend to
> think in
> > >>>> the direction, that we might want to provide a custom k8s metrics
> server,
> > >>>> that allows HPA to query the metrics from JM, to make this more
> flexible
> > >>>> and easy to use.
> > >>>> >
> > >>>> > As this looks really great in theory, there are still some
> > >>>> shortcomings that we're actively working on addressing. For this
> feature to
> > >>>> be really widely adopted, we need to make the re-scaling experience
> as fast
> > >>>> as possible, so we can re-scale often to react to the input rate.
> This
> > >>>> could be currently a problem with large RocksDB states as this
> involves
> > >>>> full re-balance of the state (splitting / merging RocksDB
> instances). The
> > >>>> k8s operator approach has the same / even worse limitation as it
> involves
> > >>>> taking a savepoint a re-building the state from it.
> > >>>> >
> > >>>> > b) Fast recovery
> > >>>> >
> > >>>> > This is IMO not as different from the native mode (although I'd
> have
> > >>>> to check whether RM failover can reuse task managers). This involves
> > >>>> frequent and fast checkpointing, local recovery (which is still not
> > >>>> supported in reactive mode, but this will be hopefully addressed
> soon) and
> > >>>> working directory efforts [4].
> > >>>> >
> > >>>> > c) Application upgrades
> > >>>> >
> > >>>> > This is the topic I'm still struggling with a little. Historically
> > >>>> this involves external lifecycle management (savepoint + submitting
> a new
> > >>>> job). I think at the end of the day, with application mode on
> standalone
> > >>>> k8s, it could be as simple as updating the docker image of the JM
> > >>>> deployment.
> > >>>> >
> > >>>> > If I think about the simplest upgrade scenario, simple in-place
> > >>>> restore from the latest checkpoint, it may be fairly simple to
> implement.
> > >>>> What I'm struggling with are the more complex upgrade scenarios
> such as
> > >>>> dual, blue / green deployment.
> > >>>> >
> > >>>> >
> > >>>> > To sum this up, I'd really love if Flink could provide great
> out-of
> > >>>> the box experience with standalone mode on k8s, that makes the
> experience
> > >>>> as close to running / operating any other application as possible.
> > >>>> >
> > >>>> > I'd really appreciate to hear your thoughts on this topic.
> > >>>> >
> > >>>> > [1]
> > >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
> > >>>> > [2] https://github.com/flink-extended/flink-remote-shuffle
> > >>>> > [3]
> > >>>>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/
> > >>>> > [4]
> > >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-198%3A+Working+directory+for+Flink+processes
> > >>>> >
> > >>>> > Best,
> > >>>> > D.
> > >>>> >
> > >>>> > On Tue, Jan 4, 2022 at 12:44 AM Thomas Weise <t...@apache.org>
> wrote:
> > >>>> >>
> > >>>> >> Hi,
> > >>>> >>
> > >>>> >> I was recently looking at the Flink native Kubernetes
> integration [1]
> > >>>> >> to get an idea how it relates to existing operator based
> solutions
> > >>>> >> [2], [3].
> > >>>> >>
> > >>>> >> Part of the native integration's motivations was simplicity (no
> extra
> > >>>> >> component to install), but arguably that is also a shortcoming.
> The
> > >>>> >> k8s operator model can offer support for application lifecycle
> like
> > >>>> >> upgrade and rescaling, as well as job submission without a Flink
> > >>>> >> client.
> > >>>> >>
> > >>>> >> When using the Flink native integration it would still be
> necessary
> > >>>> to
> > >>>> >> provide that controller functionality. Is the idea to use the
> native
> > >>>> >> integration for task manager resource allocation in tandem with
> an
> > >>>> >> operator that provides the external controller functionality? If
> > >>>> >> anyone using the Flink native integration can share experience, I
> > >>>> >> would be curious to learn more about the specific setup and if
> there
> > >>>> >> are plans to expand the k8s native integration capabilities.
> > >>>> >>
> > >>>> >> For example:
> > >>>> >>
> > >>>> >> * Application upgrade with features such as [4]. Since the job
> > >>>> manager
> > >>>> >> is part of the deployment it cannot orchestrate the deployment.
> It
> > >>>> >> needs to be the responsibility of an external process. Has anyone
> > >>>> >> contemplated adding such a component to Flink itself?
> > >>>> >>
> > >>>> >> * Rescaling: Theoretically a parallelism change could be
> performed
> > >>>> w/o
> > >>>> >> restart of the job manager pod. Hence, building blocks to
> trigger and
> > >>>> >> apply rescaling could be part of Flink itself. Has this been
> explored
> > >>>> >> further?
> > >>>> >>
> > >>>> >> Yang kindly pointed me to [5]. Is the recommendation/conclusion
> that
> > >>>> >> when a k8s operator is already used, then let it be in charge of
> the
> > >>>> >> task manager resource allocation? If so, what scenario was the
> native
> > >>>> >> k8s integration originally intended for?
> > >>>> >>
> > >>>> >> Thanks,
> > >>>> >> Thomas
> > >>>> >>
> > >>>> >> [1]
> > >>>>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#deployment-modes
> > >>>> >> [2] https://github.com/lyft/flinkk8soperator
> > >>>> >> [3] https://github.com/spotify/flink-on-k8s-operator
> > >>>> >> [4]
> > >>>>
> https://github.com/lyft/flinkk8soperator/blob/master/docs/state_machine.md
> > >>>> >> [5]
> https://lists.apache.org/thread/8cn99f6n8nhr07n5vqfo880tpm624s5d
> > >>>>
> > >>>
> >
> > --
> >
> > Konstantin Knauf
> >
> > https://twitter.com/snntrable
> >
> > https://github.com/knaufk
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk

Reply via email to