Hi, YI
Thanks a lot for the explanation and the update. I still have two small
questions I'd like to get your thoughts on:

*1. On auto-scaling*

I think auto-scaling should be a first-class citizen in Flink as a whole —
and RpcOperator was proposed precisely to make this achievable for GPU
workloads. For that reason, baking a fixed parallelism into the API as the
first step doesn't feel right to me; put simply, I don't think we should
introduce an API that locks parallelism to a single value.

That said, I fully agree that a complete auto-scaling implementation is
complex and isn't the focus of this FLIP. So my suggestion is that the
first version could just support the degenerate case — e.g. min = max —
without committing the API itself to fixed parallelism.

*2. On the lifecycle question*

a. This is actually tied to how we express the API. What I've had in mind
is something like ds.query(new RPCOperatorService), since it leaves no room
for ambiguity: the service clearly belongs to a single job, and every job
has its own.

b. Consider this scenario: a Flink application-mode deployment containing
two jobs, where each job needs its own independent RPC service. With your
current design, would I have to call addRpcService twice?

c. If it's StreamExecutionEnvironment that interacts with the RPC operator,
that feels more like something shared *across* jobs — analogous to
registering a UDF in the Table API, where multiple jobs can use it. That
seems to sit in tension with the per-job semantics above.
Best,
Guowei


On Tue, Jun 2, 2026 at 11:16 AM Gen Luo <[email protected]> wrote:

> Hi Yi,
>
> Thanks for this impressive proposal. Adding an RPC service deployment
> primitive can greatly expand Flink's capability scope and flexibility, with
> clear benefits for AI workloads and agent applications. It can also play a
> role in scenarios like state management or data lake compaction — relevant
> to but decoupled from the data pipeline. This is a very promising
> direction. That said, I still have some questions.
>
> 1. My biggest concern is heterogeneous resources, echoing Hongshun's Q4.
> Flink today uses homogeneous TM resources sliced for different task
> requirements, but decoupling GPU/CPU resources and enabling flexible
> independent scaling is one of the core values of this FLIP — and the
> current resource allocation approach cannot support this. We likely need
> new implementations for the resource allocation component.
>
> 2. Some components are described only at the idea level without
> implementation details. Hopefully these can be supplemented later or
> discussed in dedicated threads. A few worth discussing:
>
>   - RpcTask: It's currently described as a simple single-threaded loop,
> with no plan to use Mailbox or OperatorChain. But if we want to support
> async, checkpoint, etc. later, introducing Mailbox would be valuable —
> handle the input queue as defaultAction, with async/CP/system messages as
> mails. This keeps semantics consistent with StreamTask and reduces
> maintenance burden. System events like FO-triggered request cancellation
> can also be prioritized via mailbox. OperatorChain is what
> SubtaskCheckpointCoordinator directly contacts, so having RpcOperator as
> the only node in the chain wouldn't add much complexity and lowers the cost
> when CP support comes.
>
>   - Communication implementation: Will it reuse the Pekko + proxy
> mechanism? Pekko has risks with large message bodies (OOM, silently drop
> oversized frame, etc.). Since ROS is user-facing, we can't fully avoid
> this, and large-model scenarios often carry sizable context — needs careful
> handling. Also, the current design doesn't include table-function-style
> interfaces for unfolding data, so a large unfolded result has to be
> returned as a single response, inflating per-request payload size and
> amplifying the same risk.
>
>   - Client-side implementation: Only the interface is described, no
> implementation details. Checkpoint is the prominent issue — since ROS is
> stateless by design, state recording falls to the client. Do we go with
> drain or record? Drain may take a long time; record on a keyed stream may
> cause request distribution confusion after parallelism changes.
>
> 3. ROS as container, not service: Is this allowed? If the system itself has
> service capabilities — especially non-Java stacks (agents, LLMs, also
> Python remote functions) — forwarding requests through the Flink Java layer
> adds unnecessary overhead. In that case RpcOperator would only handle
> lifecycle, while the framework manages resource scheduling and service
> discovery without carrying business traffic.
>
> nit: addRpcService is inconsistent with other places that use RpcOperator /
> the ros keyword. Worth unifying.
>
> Best,
> Gen
>
> On Mon, Jun 1, 2026 at 2:02 PM Yi Zhang <[email protected]> wrote:
>
> > Hi Guowei,
> >
> >
> > Thanks for the thoughtful review and valuable feedback! Your questions
> are
> > insightful
> > and have directly driven refinements to the proposal. Here are my
> > responses to each
> > point:
> >
> >
> > 1. Autoscaling
> > Supporting a (min, max) concurrency range is a great idea. However,
> > auto-scaling
> > involves much more than just the API surface — it requires discussion
> > around
> > scaling policies, coordination with the scheduler, metric-driven
> triggers,
> > and more.
> > To keep this FLIP focused, I'd like to defer that to a follow-up. Fixed
> > parallelism is
> > a common and necessary capability on its own, and the first version will
> > support
> > that. The interfaces can be extended later to include auto-scaling
> support
> > without
> > breaking changes.
> >
> >
> > 2. Async invocation & end-to-end examples
> > This is a good point. The `RosClient` already provides an async
> interface,
> > so using
> > it with `AsyncDataStream` is indeed more natural and idiomatic. I will
> add
> > an
> > asynchronous invocation example in the proposal. Regarding a complete
> > end-to-end
> > inference example: at the Java API level, it will still be primarily
> > conceptual, since
> > practical model inference examples really need Python API support (e.g.,
> > loading a
> > PyTorch or ONNX model). I will refine the existing example to make the
> > overall
> > invocation chain as close to a real-world scenario as possible.
> >
> >
> > 3. GPU as a resource
> > This is indeed a key concern. The `extendedResource` API in the original
> > proposal
> > comes from Flink's existing resource model (introduced by FLIP-108).
> > However, I
> > agree that GPU deserves first-class treatment rather than being accessed
> > through a
> > generic extension mechanism. Promoting GPU to a first-class resource will
> > involve
> > changes to both the API and the scheduling layer, which we plan to
> address
> > in a
> > dedicated FLIP. For now, I will remove the `extendedResource` API from
> > `RosDescriptor` to keep this FLIP focused on the RPC service primitive
> > itself. The
> > resource declaration API can be extended once the GPU-specific FLIP is in
> > place.
> >
> >
> > 4. Serial method invocation
> > Agreed. I will make the serial invocation semantics more prominent — both
> > in the
> > FLIP documentation and in the code-level JavaDoc for `RpcOperator`.
> >
> >
> > 5. Why synchronous methods?
> > This is a key question. The initial design chose synchronous methods for
> > simplicity
> > — users don't need to worry about thread safety, and concurrency is
> > achieved by
> > scaling out instances (parallelism). However, I recognize that
> > asynchronous
> > processing is a critical requirement for certain scenarios (e.g., when
> the
> > RPC service
> > performs expensive operations that can be parallelized internally). I am
> > planning to
> > extend the current design to support asynchronous method execution: if a
> > user's
> > method returns a `CompletableFuture`, the framework will recognize it and
> > wait for
> > the future to complete before sending the response, without blocking the
> > invoke
> > thread. This allows the invoke thread to continue dispatching subsequent
> > requests,
> > while the user controls internal concurrency. Importantly,
> > `processRequest` calls
> > remain serial — the async behavior is purely opt-in.
> >
> >
> > 6. SQL UDF usage
> > The current design is primarily oriented toward the DataStream API, as it
> > provides
> > the foundational building blocks. I agree that SQL and Table API support
> > is an
> > important aspect to consider as well. On the invocation side, both Table
> > API and
> > SQL UDFs share the same runtime path through `FunctionContext`, so
> > enabling
> > RPC service access from UDFs is straightforward — I will add a
> > `getRosClient`
> > method to `FunctionContext` and include that change in this FLIP.
> > The real challenge for SQL lies on the definition side: how to define and
> > register
> > an RPC service using pure SQL syntax, which would require SQL grammar
> > extensions and planner changes. I think that part would be better
> > addressed in
> > a dedicated follow-up FLIP to give it the attention it deserves.
> >
> >
> > 7. Service sharing across jobs & lifecycle
> > In this FLIP, the RPC service is defined as part of the job (via
> > `env.addRpcService()`
> > before `env.execute()`). It cannot be accessed across jobs, and it is
> > released when
> > the job reaches a terminal state. I believe your question is more
> relevant
> > to the
> > "Shared RPC Service" mentioned in the Follow-up Tasks section. While it's
> > out of
> > scope for this FLIP, let me briefly describe the envisioned design: a
> > shared RPC
> > service would run as an independent job within the same Application,
> > accessible by
> > all jobs in that Application. It would terminate when the Application
> > ends, or could be
> > stopped manually via a user-facing API.
> > The embedded (per-job) service model discussed in this FLIP is simpler to
> > manage
> > and well-suited for many streaming scenarios, which is why we focus on it
> > first.
> > Importantly, both the embedded and shared modes will reuse the same set
> of
> > deployment primitives (`RpcOperator`, `RosClient`, `RosDescriptor`, etc.)
> > — the
> > difference is only in lifecycle management and visibility scope.
> >
> >
> >
> >
> > I hope the above addresses your questions. I will update the proposal to
> > reflect the
> > changes mentioned above. Please feel free to follow up if you have any
> > further
> > questions or suggestions.
> >
> >
> > Best,
> > Yi
> >
> >
> >
> > At 2026-05-28 16:39:49, "Guowei Ma" <[email protected]> wrote:
> > >HI,YI
> > >
> > >One more question I'd like to raise is around how the service behaves
> when
> > >it's referenced by more than one job. If two jobs are submitted within
> the
> > >same application and both reference the same RpcOperator service, is
> that
> > >service shared across them, and what is the expected behavior in that
> > case?
> > >I'm also curious how this plays out for bounded (batch) jobs in
> > particular:
> > >if one job completes while the other is still running, what does the
> > >service lifecycle look like — is the service torn down as soon as the
> > first
> > >job finishes, and if so, how does that affect the job that's still
> > running?
> > >
> > >Best,
> > >Guowei
> > >
> > >
> > >On Thu, May 28, 2026 at 2:55 PM Guowei Ma <[email protected]> wrote:
> > >
> > >> Hi,YI
> > >>
> > >> Thanks for putting together this FLIP — it's a well-thought-out
> > proposal,
> > >> and the RpcOperator Service is a really valuable addition for
> > AI/inference
> > >> workloads on Flink. I have a few questions and suggestions:
> > >>
> > >>
> > >> 1. Autoscaling
> > >> How is scale-up/scale-down expressed in this design? Could you
> consider
> > >> allowing concurrency to be configured as a (min, max) range? Currently
> > it
> > >> appears only min = max (i.e. fixed parallelism) is supported.
> > >>
> > >>
> > >> 2. Async invocation & end-to-end examples
> > >> The example shows a synchronous invocation style. I'd suggest also
> > >> providing an asynchronous invocation example — or even just a scaffold
> > that
> > >> exposes only the RpcOperator's handle and method. In addition, it
> would
> > be
> > >> best to include a complete end-to-end example of invoking a model (CPU
> > >> inference would be fine).
> > >>
> > >>
> > >> 3. GPU as a resource
> > >> GPU is surely a common/first-class resource. Accessing it here through
> > an
> > >> extend mechanism — wouldn't that mislead users? It would seem more
> > >> natural to declare GPU as a standard resource rather than going
> through
> > an
> > >> extension.
> > >>
> > >>
> > >> 4. Serial method invocation
> > >> Each method of an RpcOperator is invoked serially. I'd suggest stating
> > >> this explicitly in both the documentation and the code comments, so
> that
> > >> users — or an AI — can be clearly aware of it.
> > >>
> > >>
> > >> 5. Why synchronous methods?
> > >> Why is every method in RpcOperator a synchronous call? Does this imply
> > >> that no model inference supports concurrent invocation?
> > >>
> > >>
> > >> 6. SQL UDF usage
> > >> How would SQL UDF users invoke this capability? I'd suggest providing
> a
> > >> complete example for this as well.
> > >>
> > >>
> > >> Thanks again for the great work!
> > >>
> > >> Best,
> > >> Guowei
> > >>
> > >>
> > >> On Wed, May 27, 2026 at 8:36 PM Charles Zhang <[email protected]
> >
> > >> wrote:
> > >>
> > >>> Hi Yi, and Flink Community,
> > >>>
> > >>> Thanks for bringing up this excellent proposal. I am fully in favor
> of
> > >>> FLIP-582.
> > >>>
> > >>> In our production workloads, especially regarding large-scale AI
> > >>> inference,
> > >>> the tight coupling of the data plane and compute units has always
> been
> > a
> > >>> major pain point. If an inference subtask fails due to a GPU OOM or
> > driver
> > >>> issue, triggering a global rollback is incredibly expensive since
> model
> > >>> reloading takes minutes.
> > >>>
> > >>> The introduction of the RpcOperator Service as a first-class
> primitive
> > >>> masterfully decouples the heavy inference tasks from the mainstream
> > >>> topology. The fault isolation, independent scaling, and stateless
> > design
> > >>> perfectly match the requirements of modern AI-oriented data
> processing.
> > >>>
> > >>> This is a clean and robust architecture. Looking forward to seeing
> this
> > >>> merged!
> > >>>
> > >>>
> > >>> Best wishes,
> > >>> Charles Zhang
> > >>> from Apache InLong
> > >>>
> > >>>
> > >>> Yi Zhang <[email protected]> 于2026年5月27日周三 14:12写道:
> > >>>
> > >>> > Hi everyone,
> > >>> >
> > >>> >
> > >>> >
> > >>> > I would like to start a discussion on FLIP-582: Support RpcOperator
> > >>> > Service [1].
> > >>> >
> > >>> >
> > >>> > AI-oriented workloads like multimodal data processing and model
> > >>> inference
> > >>> > are
> > >>> > growing rapidly in recent years. These workloads are characterized
> by
> > >>> > expensive
> > >>> > resources (GPUs) and high initialization costs (seconds to minutes
> > for
> > >>> > model
> > >>> > loading). In today's Flink, embedding them in the data plane
> couples
> > >>> their
> > >>> > parallelism and failover with surrounding operators; deploying them
> > as
> > >>> > external
> > >>> > services disconnects their lifecycle from the job and doubles
> > >>> operational
> > >>> > overhead.
> > >>> >
> > >>> >
> > >>> > This FLIP introduces RpcOperator Service — a framework-level
> > primitive
> > >>> > that runs
> > >>> > user-defined compute as RPC services in an independent Pipelined
> > Region
> > >>> > within
> > >>> > the Flink job. Because the service is isolated at the scheduling
> > level,
> > >>> it
> > >>> > can achieve
> > >>> > fault isolation, independent scaling, and dedicated resource
> > allocation.
> > >>> > As a native
> > >>> > Flink primitive, it also lays the foundation for automatic flow
> > control,
> > >>> > flexible load
> > >>> > balancing, and coordinated auto-scaling — all without introducing
> > >>> external
> > >>> > infrastructure or additional operational burden.
> > >>> >
> > >>> >
> > >>> >
> > >>> >
> > >>> > Looking forward to your feedback and suggestions!
> > >>> >
> > >>> >
> > >>> >
> > >>> >
> > >>> > [1]
> > >>> >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-582%3A+Support+RpcOperator+Service
> > >>> >
> > >>> >
> > >>> >
> > >>> >
> > >>> >
> > >>> > Best Regards,
> > >>> > Yi Zhang
> > >>>
> > >>
> >
>

Reply via email to