Hi, YI Thanks a lot for the explanation and the update. I still have two small questions I'd like to get your thoughts on:
*1. On auto-scaling* I think auto-scaling should be a first-class citizen in Flink as a whole — and RpcOperator was proposed precisely to make this achievable for GPU workloads. For that reason, baking a fixed parallelism into the API as the first step doesn't feel right to me; put simply, I don't think we should introduce an API that locks parallelism to a single value. That said, I fully agree that a complete auto-scaling implementation is complex and isn't the focus of this FLIP. So my suggestion is that the first version could just support the degenerate case — e.g. min = max — without committing the API itself to fixed parallelism. *2. On the lifecycle question* a. This is actually tied to how we express the API. What I've had in mind is something like ds.query(new RPCOperatorService), since it leaves no room for ambiguity: the service clearly belongs to a single job, and every job has its own. b. Consider this scenario: a Flink application-mode deployment containing two jobs, where each job needs its own independent RPC service. With your current design, would I have to call addRpcService twice? c. If it's StreamExecutionEnvironment that interacts with the RPC operator, that feels more like something shared *across* jobs — analogous to registering a UDF in the Table API, where multiple jobs can use it. That seems to sit in tension with the per-job semantics above. Best, Guowei On Tue, Jun 2, 2026 at 11:16 AM Gen Luo <[email protected]> wrote: > Hi Yi, > > Thanks for this impressive proposal. Adding an RPC service deployment > primitive can greatly expand Flink's capability scope and flexibility, with > clear benefits for AI workloads and agent applications. It can also play a > role in scenarios like state management or data lake compaction — relevant > to but decoupled from the data pipeline. This is a very promising > direction. That said, I still have some questions. > > 1. My biggest concern is heterogeneous resources, echoing Hongshun's Q4. > Flink today uses homogeneous TM resources sliced for different task > requirements, but decoupling GPU/CPU resources and enabling flexible > independent scaling is one of the core values of this FLIP — and the > current resource allocation approach cannot support this. We likely need > new implementations for the resource allocation component. > > 2. Some components are described only at the idea level without > implementation details. Hopefully these can be supplemented later or > discussed in dedicated threads. A few worth discussing: > > - RpcTask: It's currently described as a simple single-threaded loop, > with no plan to use Mailbox or OperatorChain. But if we want to support > async, checkpoint, etc. later, introducing Mailbox would be valuable — > handle the input queue as defaultAction, with async/CP/system messages as > mails. This keeps semantics consistent with StreamTask and reduces > maintenance burden. System events like FO-triggered request cancellation > can also be prioritized via mailbox. OperatorChain is what > SubtaskCheckpointCoordinator directly contacts, so having RpcOperator as > the only node in the chain wouldn't add much complexity and lowers the cost > when CP support comes. > > - Communication implementation: Will it reuse the Pekko + proxy > mechanism? Pekko has risks with large message bodies (OOM, silently drop > oversized frame, etc.). Since ROS is user-facing, we can't fully avoid > this, and large-model scenarios often carry sizable context — needs careful > handling. Also, the current design doesn't include table-function-style > interfaces for unfolding data, so a large unfolded result has to be > returned as a single response, inflating per-request payload size and > amplifying the same risk. > > - Client-side implementation: Only the interface is described, no > implementation details. Checkpoint is the prominent issue — since ROS is > stateless by design, state recording falls to the client. Do we go with > drain or record? Drain may take a long time; record on a keyed stream may > cause request distribution confusion after parallelism changes. > > 3. ROS as container, not service: Is this allowed? If the system itself has > service capabilities — especially non-Java stacks (agents, LLMs, also > Python remote functions) — forwarding requests through the Flink Java layer > adds unnecessary overhead. In that case RpcOperator would only handle > lifecycle, while the framework manages resource scheduling and service > discovery without carrying business traffic. > > nit: addRpcService is inconsistent with other places that use RpcOperator / > the ros keyword. Worth unifying. > > Best, > Gen > > On Mon, Jun 1, 2026 at 2:02 PM Yi Zhang <[email protected]> wrote: > > > Hi Guowei, > > > > > > Thanks for the thoughtful review and valuable feedback! Your questions > are > > insightful > > and have directly driven refinements to the proposal. Here are my > > responses to each > > point: > > > > > > 1. Autoscaling > > Supporting a (min, max) concurrency range is a great idea. However, > > auto-scaling > > involves much more than just the API surface — it requires discussion > > around > > scaling policies, coordination with the scheduler, metric-driven > triggers, > > and more. > > To keep this FLIP focused, I'd like to defer that to a follow-up. Fixed > > parallelism is > > a common and necessary capability on its own, and the first version will > > support > > that. The interfaces can be extended later to include auto-scaling > support > > without > > breaking changes. > > > > > > 2. Async invocation & end-to-end examples > > This is a good point. The `RosClient` already provides an async > interface, > > so using > > it with `AsyncDataStream` is indeed more natural and idiomatic. I will > add > > an > > asynchronous invocation example in the proposal. Regarding a complete > > end-to-end > > inference example: at the Java API level, it will still be primarily > > conceptual, since > > practical model inference examples really need Python API support (e.g., > > loading a > > PyTorch or ONNX model). I will refine the existing example to make the > > overall > > invocation chain as close to a real-world scenario as possible. > > > > > > 3. GPU as a resource > > This is indeed a key concern. The `extendedResource` API in the original > > proposal > > comes from Flink's existing resource model (introduced by FLIP-108). > > However, I > > agree that GPU deserves first-class treatment rather than being accessed > > through a > > generic extension mechanism. Promoting GPU to a first-class resource will > > involve > > changes to both the API and the scheduling layer, which we plan to > address > > in a > > dedicated FLIP. For now, I will remove the `extendedResource` API from > > `RosDescriptor` to keep this FLIP focused on the RPC service primitive > > itself. The > > resource declaration API can be extended once the GPU-specific FLIP is in > > place. > > > > > > 4. Serial method invocation > > Agreed. I will make the serial invocation semantics more prominent — both > > in the > > FLIP documentation and in the code-level JavaDoc for `RpcOperator`. > > > > > > 5. Why synchronous methods? > > This is a key question. The initial design chose synchronous methods for > > simplicity > > — users don't need to worry about thread safety, and concurrency is > > achieved by > > scaling out instances (parallelism). However, I recognize that > > asynchronous > > processing is a critical requirement for certain scenarios (e.g., when > the > > RPC service > > performs expensive operations that can be parallelized internally). I am > > planning to > > extend the current design to support asynchronous method execution: if a > > user's > > method returns a `CompletableFuture`, the framework will recognize it and > > wait for > > the future to complete before sending the response, without blocking the > > invoke > > thread. This allows the invoke thread to continue dispatching subsequent > > requests, > > while the user controls internal concurrency. Importantly, > > `processRequest` calls > > remain serial — the async behavior is purely opt-in. > > > > > > 6. SQL UDF usage > > The current design is primarily oriented toward the DataStream API, as it > > provides > > the foundational building blocks. I agree that SQL and Table API support > > is an > > important aspect to consider as well. On the invocation side, both Table > > API and > > SQL UDFs share the same runtime path through `FunctionContext`, so > > enabling > > RPC service access from UDFs is straightforward — I will add a > > `getRosClient` > > method to `FunctionContext` and include that change in this FLIP. > > The real challenge for SQL lies on the definition side: how to define and > > register > > an RPC service using pure SQL syntax, which would require SQL grammar > > extensions and planner changes. I think that part would be better > > addressed in > > a dedicated follow-up FLIP to give it the attention it deserves. > > > > > > 7. Service sharing across jobs & lifecycle > > In this FLIP, the RPC service is defined as part of the job (via > > `env.addRpcService()` > > before `env.execute()`). It cannot be accessed across jobs, and it is > > released when > > the job reaches a terminal state. I believe your question is more > relevant > > to the > > "Shared RPC Service" mentioned in the Follow-up Tasks section. While it's > > out of > > scope for this FLIP, let me briefly describe the envisioned design: a > > shared RPC > > service would run as an independent job within the same Application, > > accessible by > > all jobs in that Application. It would terminate when the Application > > ends, or could be > > stopped manually via a user-facing API. > > The embedded (per-job) service model discussed in this FLIP is simpler to > > manage > > and well-suited for many streaming scenarios, which is why we focus on it > > first. > > Importantly, both the embedded and shared modes will reuse the same set > of > > deployment primitives (`RpcOperator`, `RosClient`, `RosDescriptor`, etc.) > > — the > > difference is only in lifecycle management and visibility scope. > > > > > > > > > > I hope the above addresses your questions. I will update the proposal to > > reflect the > > changes mentioned above. Please feel free to follow up if you have any > > further > > questions or suggestions. > > > > > > Best, > > Yi > > > > > > > > At 2026-05-28 16:39:49, "Guowei Ma" <[email protected]> wrote: > > >HI,YI > > > > > >One more question I'd like to raise is around how the service behaves > when > > >it's referenced by more than one job. If two jobs are submitted within > the > > >same application and both reference the same RpcOperator service, is > that > > >service shared across them, and what is the expected behavior in that > > case? > > >I'm also curious how this plays out for bounded (batch) jobs in > > particular: > > >if one job completes while the other is still running, what does the > > >service lifecycle look like — is the service torn down as soon as the > > first > > >job finishes, and if so, how does that affect the job that's still > > running? > > > > > >Best, > > >Guowei > > > > > > > > >On Thu, May 28, 2026 at 2:55 PM Guowei Ma <[email protected]> wrote: > > > > > >> Hi,YI > > >> > > >> Thanks for putting together this FLIP — it's a well-thought-out > > proposal, > > >> and the RpcOperator Service is a really valuable addition for > > AI/inference > > >> workloads on Flink. I have a few questions and suggestions: > > >> > > >> > > >> 1. Autoscaling > > >> How is scale-up/scale-down expressed in this design? Could you > consider > > >> allowing concurrency to be configured as a (min, max) range? Currently > > it > > >> appears only min = max (i.e. fixed parallelism) is supported. > > >> > > >> > > >> 2. Async invocation & end-to-end examples > > >> The example shows a synchronous invocation style. I'd suggest also > > >> providing an asynchronous invocation example — or even just a scaffold > > that > > >> exposes only the RpcOperator's handle and method. In addition, it > would > > be > > >> best to include a complete end-to-end example of invoking a model (CPU > > >> inference would be fine). > > >> > > >> > > >> 3. GPU as a resource > > >> GPU is surely a common/first-class resource. Accessing it here through > > an > > >> extend mechanism — wouldn't that mislead users? It would seem more > > >> natural to declare GPU as a standard resource rather than going > through > > an > > >> extension. > > >> > > >> > > >> 4. Serial method invocation > > >> Each method of an RpcOperator is invoked serially. I'd suggest stating > > >> this explicitly in both the documentation and the code comments, so > that > > >> users — or an AI — can be clearly aware of it. > > >> > > >> > > >> 5. Why synchronous methods? > > >> Why is every method in RpcOperator a synchronous call? Does this imply > > >> that no model inference supports concurrent invocation? > > >> > > >> > > >> 6. SQL UDF usage > > >> How would SQL UDF users invoke this capability? I'd suggest providing > a > > >> complete example for this as well. > > >> > > >> > > >> Thanks again for the great work! > > >> > > >> Best, > > >> Guowei > > >> > > >> > > >> On Wed, May 27, 2026 at 8:36 PM Charles Zhang <[email protected] > > > > >> wrote: > > >> > > >>> Hi Yi, and Flink Community, > > >>> > > >>> Thanks for bringing up this excellent proposal. I am fully in favor > of > > >>> FLIP-582. > > >>> > > >>> In our production workloads, especially regarding large-scale AI > > >>> inference, > > >>> the tight coupling of the data plane and compute units has always > been > > a > > >>> major pain point. If an inference subtask fails due to a GPU OOM or > > driver > > >>> issue, triggering a global rollback is incredibly expensive since > model > > >>> reloading takes minutes. > > >>> > > >>> The introduction of the RpcOperator Service as a first-class > primitive > > >>> masterfully decouples the heavy inference tasks from the mainstream > > >>> topology. The fault isolation, independent scaling, and stateless > > design > > >>> perfectly match the requirements of modern AI-oriented data > processing. > > >>> > > >>> This is a clean and robust architecture. Looking forward to seeing > this > > >>> merged! > > >>> > > >>> > > >>> Best wishes, > > >>> Charles Zhang > > >>> from Apache InLong > > >>> > > >>> > > >>> Yi Zhang <[email protected]> 于2026年5月27日周三 14:12写道: > > >>> > > >>> > Hi everyone, > > >>> > > > >>> > > > >>> > > > >>> > I would like to start a discussion on FLIP-582: Support RpcOperator > > >>> > Service [1]. > > >>> > > > >>> > > > >>> > AI-oriented workloads like multimodal data processing and model > > >>> inference > > >>> > are > > >>> > growing rapidly in recent years. These workloads are characterized > by > > >>> > expensive > > >>> > resources (GPUs) and high initialization costs (seconds to minutes > > for > > >>> > model > > >>> > loading). In today's Flink, embedding them in the data plane > couples > > >>> their > > >>> > parallelism and failover with surrounding operators; deploying them > > as > > >>> > external > > >>> > services disconnects their lifecycle from the job and doubles > > >>> operational > > >>> > overhead. > > >>> > > > >>> > > > >>> > This FLIP introduces RpcOperator Service — a framework-level > > primitive > > >>> > that runs > > >>> > user-defined compute as RPC services in an independent Pipelined > > Region > > >>> > within > > >>> > the Flink job. Because the service is isolated at the scheduling > > level, > > >>> it > > >>> > can achieve > > >>> > fault isolation, independent scaling, and dedicated resource > > allocation. > > >>> > As a native > > >>> > Flink primitive, it also lays the foundation for automatic flow > > control, > > >>> > flexible load > > >>> > balancing, and coordinated auto-scaling — all without introducing > > >>> external > > >>> > infrastructure or additional operational burden. > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > Looking forward to your feedback and suggestions! > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > [1] > > >>> > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-582%3A+Support+RpcOperator+Service > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > Best Regards, > > >>> > Yi Zhang > > >>> > > >> > > >
