Hey!
This indeed looks very promising and powerful. At the same time it also
introduces a major new way of defining processing logic ("doing the same
thing").
To this end I have some high level questions that I couldn't answer based
on the proposal and discussion related to how such a giant new piece fits
into the existing API and ways of working for our users.

1. How will users know when to use the RPC operator vs the
existing datastream APIs, what will be our guidance?
2. How does the performance (throughput, latency) compare realistically to
a regular datastream operator? (I guess this relates to question 1 in a way)
3. Can RPC operators call other RPC operators? If I have a stateless job,
why not turn all my operators into RPC operators to get the independent
scaling/recovery benefits.

Thank you!
Gyula

On Fri, Jun 5, 2026 at 4:27 AM Yi Zhang <[email protected]> wrote:

> Hi Gen,
>
>
> Thanks for the thorough review and the insightful implementation-level
> questions!
> Here are my responses to each point:
>
>
> 1. Heterogeneous resources
> I fully agree that heterogeneous resource scheduling is a critical part of
> realizing
> the full value of decoupled GPU/CPU scaling. However, as mentioned in my
> reply
> to Hongshun's Q4, this is fundamentally a resource allocation and
> scheduling
> problem that goes beyond the scope of this FLIP. We plan to address it in
> a
> dedicated GPU and resource scheduling FLIP. The current design already
> uses
> separate SlotSharingGroups for ROS services, and the interface is designed
> to
> be extended without breaking changes when the resource scheduling FLIP
> lands.
>
>
> 2.1 RpcTask Design
> This is a very thoughtful architectural suggestion, and I appreciate the
> detailed
> reasoning from your StreamTask experience. You are right that the current
> FLIP
> focuses more on the interfaces, and some implementation details like
> Mailbox
> integration deserve further discussion.
> The key reason for not introducing Mailbox in the current RpcTask is that
> the
> RpcOperator does not yet have a strong enough need for it — especially
> given
> the stateless design, which already covers a wide range of scenarios. The
> simple queue + single-threaded run loop is sufficient to implement all
> currently
> required capabilities. This keeps the implementation minimal and easy to
> reason about.
> That said, as the RpcOperator's capabilities expand in the future — such
> as
> supporting checkpoint participation — introducing Mailbox would be a
> natural
> and valuable evolution. Importantly, this is a purely internal refactoring
> that does
> not affect any user-facing interfaces, so it can be adopted incrementally
> when
> the need arises.
>
>
> 2.2. Communication implementation
> This concern is similar to Xintong's question on the communication layer.
> The
> first version directly reuses Flink's existing Pekko RPC infrastructure,
> but in the
> longer term, introducing an alternative transport layer such as gRPC —
> which
> natively supports streaming — would address both the large-payload concern
> and the unfold/flatMap scenario you raised. This can be considered as part
> of
> the communication layer evolution, and the upper-level `RosClient` /
> `RpcOperator` interfaces are designed to be transport-agnostic.
>
>
> 2.3. Client-side implementation
> This is indeed a key question. Let me clarify how the client-side
> checkpoint
> interaction works. The `RosClient` provides an asynchronous interface
> (`CompletableFuture`), and users have two usage patterns:
>
>
> a. Synchronous usage (e.g., `client.request(...).get()` in a
> `ProcessFunction`):
> The request completes before `processElement` returns, so there are no
> in-flight requests at checkpoint time. Checkpoint is not a concern in this
> case.
>
>
> b. Asynchronous usage: Note that users should use Flink's
> `AsyncDataStream`
> with `RichAsyncFunction`, which already has built-in handling for pending
> requests during checkpoints. The `RosClient` itself does not need to
> handle
> any pending request recovery, just as any other async HTTP or RPC client
> used within `AsyncFunction` does not.
>
>
> In other words, the checkpoint semantics are not a new problem introduced
> by ROS — they are handled by the existing `AsyncDataStream` framework,
> which users are already expected to use for asynchronous invocation
> patterns.
> I will update the FLIP to emphasize this point more clearly.
>
>
>
>
> 3. ROS as container, not service
> This is a great direction to explore. In fact, the current `RpcOperator`
> interface
> is flexible enough to support this container-style usage: a user could
> implement
> `open()` to start an external service process and the `processRequest()`
> method
> could simply proxy to the external service, or — as you suggest — be
> bypassed
> entirely if the data plane communicates directly with the external service
> endpoint. This is a viable and valuable pattern. One trade-off to note is
> that
> some of ROS's built-in capabilities — such as framework-level load
> balancing
> — would not be utilized in this mode, since traffic bypasses the RPC
> request
> queue. Users would need to evaluate whether the external service provides
> equivalent capabilities or whether the direct-access performance benefit
> outweighs the loss of framework-level features.
> Thank you for raising this — it validates an important use case that the
> current
> architecture can accommodate.
>
>
> 4. Naming
> There is indeed a naming inconsistency — `addRpcService` can be renamed
> to `addRpcOperatorService` to align with the other interfaces. Thanks for
> pointing this out.
>
>
> I hope these responses address your concerns. Please feel free to follow
> up
> with any further questions or suggestions.
>
>
>
>
> Best,
> Yi
>
>
>
> At 2026-06-02 11:15:24, "Gen Luo" <[email protected]> wrote:
> >Hi Yi,
> >
> >Thanks for this impressive proposal. Adding an RPC service deployment
> >primitive can greatly expand Flink's capability scope and flexibility,
> with
> >clear benefits for AI workloads and agent applications. It can also play a
> >role in scenarios like state management or data lake compaction — relevant
> >to but decoupled from the data pipeline. This is a very promising
> >direction. That said, I still have some questions.
> >
> >1. My biggest concern is heterogeneous resources, echoing Hongshun's Q4.
> >Flink today uses homogeneous TM resources sliced for different task
> >requirements, but decoupling GPU/CPU resources and enabling flexible
> >independent scaling is one of the core values of this FLIP — and the
> >current resource allocation approach cannot support this. We likely need
> >new implementations for the resource allocation component.
> >
> >2. Some components are described only at the idea level without
> >implementation details. Hopefully these can be supplemented later or
> >discussed in dedicated threads. A few worth discussing:
> >
> >  - RpcTask: It's currently described as a simple single-threaded loop,
> >with no plan to use Mailbox or OperatorChain. But if we want to support
> >async, checkpoint, etc. later, introducing Mailbox would be valuable —
> >handle the input queue as defaultAction, with async/CP/system messages as
> >mails. This keeps semantics consistent with StreamTask and reduces
> >maintenance burden. System events like FO-triggered request cancellation
> >can also be prioritized via mailbox. OperatorChain is what
> >SubtaskCheckpointCoordinator directly contacts, so having RpcOperator as
> >the only node in the chain wouldn't add much complexity and lowers the
> cost
> >when CP support comes.
> >
> >  - Communication implementation: Will it reuse the Pekko + proxy
> >mechanism? Pekko has risks with large message bodies (OOM, silently drop
> >oversized frame, etc.). Since ROS is user-facing, we can't fully avoid
> >this, and large-model scenarios often carry sizable context — needs
> careful
> >handling. Also, the current design doesn't include table-function-style
> >interfaces for unfolding data, so a large unfolded result has to be
> >returned as a single response, inflating per-request payload size and
> >amplifying the same risk.
> >
> >  - Client-side implementation: Only the interface is described, no
> >implementation details. Checkpoint is the prominent issue — since ROS is
> >stateless by design, state recording falls to the client. Do we go with
> >drain or record? Drain may take a long time; record on a keyed stream may
> >cause request distribution confusion after parallelism changes.
> >
> >3. ROS as container, not service: Is this allowed? If the system itself
> has
> >service capabilities — especially non-Java stacks (agents, LLMs, also
> >Python remote functions) — forwarding requests through the Flink Java
> layer
> >adds unnecessary overhead. In that case RpcOperator would only handle
> >lifecycle, while the framework manages resource scheduling and service
> >discovery without carrying business traffic.
> >
> >nit: addRpcService is inconsistent with other places that use RpcOperator
> /
> >the ros keyword. Worth unifying.
> >
> >Best,
> >Gen
> >
> >On Mon, Jun 1, 2026 at 2:02 PM Yi Zhang <[email protected]> wrote:
> >
> >> Hi Guowei,
> >>
> >>
> >> Thanks for the thoughtful review and valuable feedback! Your questions
> are
> >> insightful
> >> and have directly driven refinements to the proposal. Here are my
> >> responses to each
> >> point:
> >>
> >>
> >> 1. Autoscaling
> >> Supporting a (min, max) concurrency range is a great idea. However,
> >> auto-scaling
> >> involves much more than just the API surface — it requires discussion
> >> around
> >> scaling policies, coordination with the scheduler, metric-driven
> triggers,
> >> and more.
> >> To keep this FLIP focused, I'd like to defer that to a follow-up. Fixed
> >> parallelism is
> >> a common and necessary capability on its own, and the first version will
> >> support
> >> that. The interfaces can be extended later to include auto-scaling
> support
> >> without
> >> breaking changes.
> >>
> >>
> >> 2. Async invocation & end-to-end examples
> >> This is a good point. The `RosClient` already provides an async
> interface,
> >> so using
> >> it with `AsyncDataStream` is indeed more natural and idiomatic. I will
> add
> >> an
> >> asynchronous invocation example in the proposal. Regarding a complete
> >> end-to-end
> >> inference example: at the Java API level, it will still be primarily
> >> conceptual, since
> >> practical model inference examples really need Python API support (e.g.,
> >> loading a
> >> PyTorch or ONNX model). I will refine the existing example to make the
> >> overall
> >> invocation chain as close to a real-world scenario as possible.
> >>
> >>
> >> 3. GPU as a resource
> >> This is indeed a key concern. The `extendedResource` API in the original
> >> proposal
> >> comes from Flink's existing resource model (introduced by FLIP-108).
> >> However, I
> >> agree that GPU deserves first-class treatment rather than being accessed
> >> through a
> >> generic extension mechanism. Promoting GPU to a first-class resource
> will
> >> involve
> >> changes to both the API and the scheduling layer, which we plan to
> address
> >> in a
> >> dedicated FLIP. For now, I will remove the `extendedResource` API from
> >> `RosDescriptor` to keep this FLIP focused on the RPC service primitive
> >> itself. The
> >> resource declaration API can be extended once the GPU-specific FLIP is
> in
> >> place.
> >>
> >>
> >> 4. Serial method invocation
> >> Agreed. I will make the serial invocation semantics more prominent —
> both
> >> in the
> >> FLIP documentation and in the code-level JavaDoc for `RpcOperator`.
> >>
> >>
> >> 5. Why synchronous methods?
> >> This is a key question. The initial design chose synchronous methods for
> >> simplicity
> >> — users don't need to worry about thread safety, and concurrency is
> >> achieved by
> >> scaling out instances (parallelism). However, I recognize that
> >> asynchronous
> >> processing is a critical requirement for certain scenarios (e.g., when
> the
> >> RPC service
> >> performs expensive operations that can be parallelized internally). I am
> >> planning to
> >> extend the current design to support asynchronous method execution: if a
> >> user's
> >> method returns a `CompletableFuture`, the framework will recognize it
> and
> >> wait for
> >> the future to complete before sending the response, without blocking the
> >> invoke
> >> thread. This allows the invoke thread to continue dispatching subsequent
> >> requests,
> >> while the user controls internal concurrency. Importantly,
> >> `processRequest` calls
> >> remain serial — the async behavior is purely opt-in.
> >>
> >>
> >> 6. SQL UDF usage
> >> The current design is primarily oriented toward the DataStream API, as
> it
> >> provides
> >> the foundational building blocks. I agree that SQL and Table API support
> >> is an
> >> important aspect to consider as well. On the invocation side, both Table
> >> API and
> >> SQL UDFs share the same runtime path through `FunctionContext`, so
> >> enabling
> >> RPC service access from UDFs is straightforward — I will add a
> >> `getRosClient`
> >> method to `FunctionContext` and include that change in this FLIP.
> >> The real challenge for SQL lies on the definition side: how to define
> and
> >> register
> >> an RPC service using pure SQL syntax, which would require SQL grammar
> >> extensions and planner changes. I think that part would be better
> >> addressed in
> >> a dedicated follow-up FLIP to give it the attention it deserves.
> >>
> >>
> >> 7. Service sharing across jobs & lifecycle
> >> In this FLIP, the RPC service is defined as part of the job (via
> >> `env.addRpcService()`
> >> before `env.execute()`). It cannot be accessed across jobs, and it is
> >> released when
> >> the job reaches a terminal state. I believe your question is more
> relevant
> >> to the
> >> "Shared RPC Service" mentioned in the Follow-up Tasks section. While
> it's
> >> out of
> >> scope for this FLIP, let me briefly describe the envisioned design: a
> >> shared RPC
> >> service would run as an independent job within the same Application,
> >> accessible by
> >> all jobs in that Application. It would terminate when the Application
> >> ends, or could be
> >> stopped manually via a user-facing API.
> >> The embedded (per-job) service model discussed in this FLIP is simpler
> to
> >> manage
> >> and well-suited for many streaming scenarios, which is why we focus on
> it
> >> first.
> >> Importantly, both the embedded and shared modes will reuse the same set
> of
> >> deployment primitives (`RpcOperator`, `RosClient`, `RosDescriptor`,
> etc.)
> >> — the
> >> difference is only in lifecycle management and visibility scope.
> >>
> >>
> >>
> >>
> >> I hope the above addresses your questions. I will update the proposal to
> >> reflect the
> >> changes mentioned above. Please feel free to follow up if you have any
> >> further
> >> questions or suggestions.
> >>
> >>
> >> Best,
> >> Yi
> >>
> >>
> >>
> >> At 2026-05-28 16:39:49, "Guowei Ma" <[email protected]> wrote:
> >> >HI,YI
> >> >
> >> >One more question I'd like to raise is around how the service behaves
> when
> >> >it's referenced by more than one job. If two jobs are submitted within
> the
> >> >same application and both reference the same RpcOperator service, is
> that
> >> >service shared across them, and what is the expected behavior in that
> >> case?
> >> >I'm also curious how this plays out for bounded (batch) jobs in
> >> particular:
> >> >if one job completes while the other is still running, what does the
> >> >service lifecycle look like — is the service torn down as soon as the
> >> first
> >> >job finishes, and if so, how does that affect the job that's still
> >> running?
> >> >
> >> >Best,
> >> >Guowei
> >> >
> >> >
> >> >On Thu, May 28, 2026 at 2:55 PM Guowei Ma <[email protected]>
> wrote:
> >> >
> >> >> Hi,YI
> >> >>
> >> >> Thanks for putting together this FLIP — it's a well-thought-out
> >> proposal,
> >> >> and the RpcOperator Service is a really valuable addition for
> >> AI/inference
> >> >> workloads on Flink. I have a few questions and suggestions:
> >> >>
> >> >>
> >> >> 1. Autoscaling
> >> >> How is scale-up/scale-down expressed in this design? Could you
> consider
> >> >> allowing concurrency to be configured as a (min, max) range?
> Currently
> >> it
> >> >> appears only min = max (i.e. fixed parallelism) is supported.
> >> >>
> >> >>
> >> >> 2. Async invocation & end-to-end examples
> >> >> The example shows a synchronous invocation style. I'd suggest also
> >> >> providing an asynchronous invocation example — or even just a
> scaffold
> >> that
> >> >> exposes only the RpcOperator's handle and method. In addition, it
> would
> >> be
> >> >> best to include a complete end-to-end example of invoking a model
> (CPU
> >> >> inference would be fine).
> >> >>
> >> >>
> >> >> 3. GPU as a resource
> >> >> GPU is surely a common/first-class resource. Accessing it here
> through
> >> an
> >> >> extend mechanism — wouldn't that mislead users? It would seem more
> >> >> natural to declare GPU as a standard resource rather than going
> through
> >> an
> >> >> extension.
> >> >>
> >> >>
> >> >> 4. Serial method invocation
> >> >> Each method of an RpcOperator is invoked serially. I'd suggest
> stating
> >> >> this explicitly in both the documentation and the code comments, so
> that
> >> >> users — or an AI — can be clearly aware of it.
> >> >>
> >> >>
> >> >> 5. Why synchronous methods?
> >> >> Why is every method in RpcOperator a synchronous call? Does this
> imply
> >> >> that no model inference supports concurrent invocation?
> >> >>
> >> >>
> >> >> 6. SQL UDF usage
> >> >> How would SQL UDF users invoke this capability? I'd suggest
> providing a
> >> >> complete example for this as well.
> >> >>
> >> >>
> >> >> Thanks again for the great work!
> >> >>
> >> >> Best,
> >> >> Guowei
> >> >>
> >> >>
> >> >> On Wed, May 27, 2026 at 8:36 PM Charles Zhang <
> [email protected]>
> >> >> wrote:
> >> >>
> >> >>> Hi Yi, and Flink Community,
> >> >>>
> >> >>> Thanks for bringing up this excellent proposal. I am fully in favor
> of
> >> >>> FLIP-582.
> >> >>>
> >> >>> In our production workloads, especially regarding large-scale AI
> >> >>> inference,
> >> >>> the tight coupling of the data plane and compute units has always
> been
> >> a
> >> >>> major pain point. If an inference subtask fails due to a GPU OOM or
> >> driver
> >> >>> issue, triggering a global rollback is incredibly expensive since
> model
> >> >>> reloading takes minutes.
> >> >>>
> >> >>> The introduction of the RpcOperator Service as a first-class
> primitive
> >> >>> masterfully decouples the heavy inference tasks from the mainstream
> >> >>> topology. The fault isolation, independent scaling, and stateless
> >> design
> >> >>> perfectly match the requirements of modern AI-oriented data
> processing.
> >> >>>
> >> >>> This is a clean and robust architecture. Looking forward to seeing
> this
> >> >>> merged!
> >> >>>
> >> >>>
> >> >>> Best wishes,
> >> >>> Charles Zhang
> >> >>> from Apache InLong
> >> >>>
> >> >>>
> >> >>> Yi Zhang <[email protected]> 于2026年5月27日周三 14:12写道:
> >> >>>
> >> >>> > Hi everyone,
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> > I would like to start a discussion on FLIP-582: Support
> RpcOperator
> >> >>> > Service [1].
> >> >>> >
> >> >>> >
> >> >>> > AI-oriented workloads like multimodal data processing and model
> >> >>> inference
> >> >>> > are
> >> >>> > growing rapidly in recent years. These workloads are
> characterized by
> >> >>> > expensive
> >> >>> > resources (GPUs) and high initialization costs (seconds to minutes
> >> for
> >> >>> > model
> >> >>> > loading). In today's Flink, embedding them in the data plane
> couples
> >> >>> their
> >> >>> > parallelism and failover with surrounding operators; deploying
> them
> >> as
> >> >>> > external
> >> >>> > services disconnects their lifecycle from the job and doubles
> >> >>> operational
> >> >>> > overhead.
> >> >>> >
> >> >>> >
> >> >>> > This FLIP introduces RpcOperator Service — a framework-level
> >> primitive
> >> >>> > that runs
> >> >>> > user-defined compute as RPC services in an independent Pipelined
> >> Region
> >> >>> > within
> >> >>> > the Flink job. Because the service is isolated at the scheduling
> >> level,
> >> >>> it
> >> >>> > can achieve
> >> >>> > fault isolation, independent scaling, and dedicated resource
> >> allocation.
> >> >>> > As a native
> >> >>> > Flink primitive, it also lays the foundation for automatic flow
> >> control,
> >> >>> > flexible load
> >> >>> > balancing, and coordinated auto-scaling — all without introducing
> >> >>> external
> >> >>> > infrastructure or additional operational burden.
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> > Looking forward to your feedback and suggestions!
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> > [1]
> >> >>> >
> >> >>> >
> >> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-582%3A+Support+RpcOperator+Service
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> > Best Regards,
> >> >>> > Yi Zhang
> >> >>>
> >> >>
> >>
>

Reply via email to