Hi Yi, Thanks for the FLIP. +1 from my side.
Requiring operators that are connected and communicating to restart and rescale as a whole is a necessary constraint in Flink's stateful stream processing model, since that's what keeps state consistent. But in AI scenarios, many of the heavy, costly operators (model inference, multimodal processing, and the like) are themselves stateless. Their real problem is high initialization cost and expensive resources, and applying the same all-or-nothing constraint to them turns into a serious burden. This FLIP decouples exactly these operators from the data plane's Pipelined Region, which addresses the mismatch at its root. I like the direction. There's also a clear need for an rpc operator from the Flink Agents side. Take the subagent pattern: a main agent spawns one or more subagents to handle subtasks, and that workload is highly bursty. Within the same job, one moment's input may need several subagents while the next needs none. Statically allocating subagent resources per operator parallelism doesn't work well here: under-allocate and the operator gets blown out as soon as demand arrives, over-allocate and the resources sit idle most of the time. The natural fit is to put subagent execution into a shared resource pool that multiple upstream subtasks can use and that scales elastically with load, which maps directly onto what rpc operator provides: each instance forming its own Pipelined Region, giving independent scaling and flexible load balancing. One question on the communication layer. The first version reuses Pekko RPC, but the proposal only discusses latency overhead and QPS, not the size of a single message. Pekko has a default cap on per-message size, and in AI scenarios the context carried in a single call can be quite large (multimodal inputs, long context, etc.). Has this case been considered? How should large messages be handled, by chunking, by raising the limit, or by going through a different transport? Best, Xintong On Thu, May 28, 2026 at 7:35 PM Hongshun Wang <[email protected]> wrote: > Hi Yi, > > Thanks for the great work on FLIP-582. I've been reading through the > proposal and have a few questions regarding the failover/recovery semantics > and resource efficiency of RpcOperator Service. I'd appreciate any > clarification. > > > 1. Cost of recomputation due to stateless design > > The proposal states that "RPC services do not participate in > checkpointing." This means that upon any data-plane failure and checkpoint > rollback, all in-flight RPC requests are lost and must be re-triggered from > scratch. For workloads where a single RPC invocation is expensive (e.g., > LLM inference taking seconds to minutes per request), this could lead to > significant wasted computation. > > Has there been any consideration of a lightweight mechanism to avoid > redundant recomputation — for example, a result cache on the service side > keyed by request identity, or an optional "resumable request" protocol? Or > is the position that this cost is acceptable given the target workload > characteristics? > > > 2. Cancelling in-flight RPC requests on data-plane failure > > When a data-plane operator fails and its Pipelined Region restarts, there > may still be RPC requests in progress on the service side that were issued > by the now-restarting data-plane tasks. Since those results will be > discarded anyway (the data plane will re-issue them after recovery), > continuing to process them wastes resources — particularly expensive GPU > cycles. > > Is there a plan to provide an interface or mechanism for the data plane (or > the framework) to proactively cancel outstanding RPC requests upon > failure/restart? For example, a cancellation signal propagated from the > RosClient to the RPC service, or a request-scoping mechanism tied to the > checkpoint epoch, so that the service side can abort stale requests early. > > > 3. Data-plane behavior when an RPC service subtask fails > > The proposal mentions that when an RPC service subtask fails, "requests are > routed to other surviving instances." However, I'm curious about the > behavior for requests that are already in-flight to the failed instance: > > - Does the data-plane operator have to wait for a timeout (as > configured) before it knows the request has failed? > - Is there any fast-failure notification mechanism (e.g., > connection-closed detection) that allows the RosClient to immediately > fail > over in-flight requests to other instances without waiting for the full > timeout? > - If the service has only one instance (parallelism = 1) and it fails, > does the data plane simply block (with backpressure) until the instance > recovers, or does it fail fast with UNAVAILABLE errors? > > A fast-failure path would be important for reducing end-to-end latency > impact during service-side failures. > > > 4. Resource utilization concerns with separate SlotSharingGroup > > The proposal states that "each RPC service automatically creates a separate > SlotSharingGroup" and that this is "fully compatible with Flink's existing > K8s/YARN resource management." However, in Flink's current resource model, > all TaskManagers share the same configuration template (determined by > global taskmanager.memory.* and CPU settings). Fine-Grained Resource > Management controls how slots are sized within a TM, but does not enable > heterogeneous TM specifications across the cluster. > > This raises a practical concern: if an RpcOperator requires GPU resources, > the TM configuration must include GPU allocation. Since all TMs share the > same template, this means every TM in the cluster would be provisioned with > GPUs — even those exclusively hosting data-plane tasks that have no GPU > requirement. This leads to significant GPU resource waste. > > Furthermore, the separate SlotSharingGroup design prevents co-locating > RpcOperator and data-plane tasks on the same TaskManager. In practice, > co-location (e.g., RpcOperator using the GPU while data-plane tasks use the > CPU on the same GPU node) would improve overall resource utilization. The > current design prioritizes isolation at the cost of efficiency. > > Has there been consideration of: > > - Supporting heterogeneous TaskManager specifications (different TM > templates for different SlotSharingGroups) to avoid provisioning GPU on > CPU-only TMs?(As a reference, KubeRay's Worker Group model addresses > exactly this problem for Ray clusters. In KubeRay, a RayCluster defines > multiple workerGroupSpecs, each with its own pod template, resource > configuration, node selectors, and independent autoscaling bounds) > - Allowing optional co-location of RpcOperator and data-plane slots on > the same TM (in separate slots) for scenarios where utilization is > prioritized over strict process-level isolation? > - Or is there a dependency on other sub-FLIPs under FLIP-577 to address > this gap in GPU resource scheduling? > > Thanks in advance for the clarification. Looking forward to the continued > discussion. > > > Best regards > > Hongshun > > On Thu, May 28, 2026 at 4:40 PM Guowei Ma <[email protected]> wrote: > > > HI,YI > > > > One more question I'd like to raise is around how the service behaves > when > > it's referenced by more than one job. If two jobs are submitted within > the > > same application and both reference the same RpcOperator service, is that > > service shared across them, and what is the expected behavior in that > case? > > I'm also curious how this plays out for bounded (batch) jobs in > particular: > > if one job completes while the other is still running, what does the > > service lifecycle look like — is the service torn down as soon as the > first > > job finishes, and if so, how does that affect the job that's still > running? > > > > Best, > > Guowei > > > > > > On Thu, May 28, 2026 at 2:55 PM Guowei Ma <[email protected]> wrote: > > > > > Hi,YI > > > > > > Thanks for putting together this FLIP — it's a well-thought-out > proposal, > > > and the RpcOperator Service is a really valuable addition for > > AI/inference > > > workloads on Flink. I have a few questions and suggestions: > > > > > > > > > 1. Autoscaling > > > How is scale-up/scale-down expressed in this design? Could you consider > > > allowing concurrency to be configured as a (min, max) range? Currently > it > > > appears only min = max (i.e. fixed parallelism) is supported. > > > > > > > > > 2. Async invocation & end-to-end examples > > > The example shows a synchronous invocation style. I'd suggest also > > > providing an asynchronous invocation example — or even just a scaffold > > that > > > exposes only the RpcOperator's handle and method. In addition, it would > > be > > > best to include a complete end-to-end example of invoking a model (CPU > > > inference would be fine). > > > > > > > > > 3. GPU as a resource > > > GPU is surely a common/first-class resource. Accessing it here through > an > > > extend mechanism — wouldn't that mislead users? It would seem more > > > natural to declare GPU as a standard resource rather than going through > > an > > > extension. > > > > > > > > > 4. Serial method invocation > > > Each method of an RpcOperator is invoked serially. I'd suggest stating > > > this explicitly in both the documentation and the code comments, so > that > > > users — or an AI — can be clearly aware of it. > > > > > > > > > 5. Why synchronous methods? > > > Why is every method in RpcOperator a synchronous call? Does this imply > > > that no model inference supports concurrent invocation? > > > > > > > > > 6. SQL UDF usage > > > How would SQL UDF users invoke this capability? I'd suggest providing a > > > complete example for this as well. > > > > > > > > > Thanks again for the great work! > > > > > > Best, > > > Guowei > > > > > > > > > On Wed, May 27, 2026 at 8:36 PM Charles Zhang <[email protected]> > > > wrote: > > > > > >> Hi Yi, and Flink Community, > > >> > > >> Thanks for bringing up this excellent proposal. I am fully in favor of > > >> FLIP-582. > > >> > > >> In our production workloads, especially regarding large-scale AI > > >> inference, > > >> the tight coupling of the data plane and compute units has always > been a > > >> major pain point. If an inference subtask fails due to a GPU OOM or > > driver > > >> issue, triggering a global rollback is incredibly expensive since > model > > >> reloading takes minutes. > > >> > > >> The introduction of the RpcOperator Service as a first-class primitive > > >> masterfully decouples the heavy inference tasks from the mainstream > > >> topology. The fault isolation, independent scaling, and stateless > design > > >> perfectly match the requirements of modern AI-oriented data > processing. > > >> > > >> This is a clean and robust architecture. Looking forward to seeing > this > > >> merged! > > >> > > >> > > >> Best wishes, > > >> Charles Zhang > > >> from Apache InLong > > >> > > >> > > >> Yi Zhang <[email protected]> 于2026年5月27日周三 14:12写道: > > >> > > >> > Hi everyone, > > >> > > > >> > > > >> > > > >> > I would like to start a discussion on FLIP-582: Support RpcOperator > > >> > Service [1]. > > >> > > > >> > > > >> > AI-oriented workloads like multimodal data processing and model > > >> inference > > >> > are > > >> > growing rapidly in recent years. These workloads are characterized > by > > >> > expensive > > >> > resources (GPUs) and high initialization costs (seconds to minutes > for > > >> > model > > >> > loading). In today's Flink, embedding them in the data plane couples > > >> their > > >> > parallelism and failover with surrounding operators; deploying them > as > > >> > external > > >> > services disconnects their lifecycle from the job and doubles > > >> operational > > >> > overhead. > > >> > > > >> > > > >> > This FLIP introduces RpcOperator Service — a framework-level > primitive > > >> > that runs > > >> > user-defined compute as RPC services in an independent Pipelined > > Region > > >> > within > > >> > the Flink job. Because the service is isolated at the scheduling > > level, > > >> it > > >> > can achieve > > >> > fault isolation, independent scaling, and dedicated resource > > allocation. > > >> > As a native > > >> > Flink primitive, it also lays the foundation for automatic flow > > control, > > >> > flexible load > > >> > balancing, and coordinated auto-scaling — all without introducing > > >> external > > >> > infrastructure or additional operational burden. > > >> > > > >> > > > >> > > > >> > > > >> > Looking forward to your feedback and suggestions! > > >> > > > >> > > > >> > > > >> > > > >> > [1] > > >> > > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-582%3A+Support+RpcOperator+Service > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > Best Regards, > > >> > Yi Zhang > > >> > > > > > >
