Hi Hongshun,

Thank you for the in-depth review and the highly insightful questions! Your 
feedback has prompted us 
to think more carefully about several important runtime aspects.


1. Cost of recomputation due to stateless design
This is a real and practical concern. For the of RPC services where request 
latency is manageable, 
the cost of recomputation upon data-plane failover is acceptable. And the 
volume of replayed requests 
can be effectively controlled by reducing the checkpoint interval, which limits 
the window of data that 
needs to be reprocessed.
However, for long-running requests, the wasted computation deserves serious 
consideration. Server-side 
result caching keyed by request identity is a promising direction. That said, 
caching alone may not be 
sufficient: cache hits depend on replayed requests being routed to the **same 
service instance** that 
processed the original request. This introduces a tension with load balancing — 
a sticky routing strategy 
would improve cache hit rates but may compromise load distribution, making it 
suitable only for specific 
scenarios where request affinity is acceptable.
I think this is best addressed as a relatively independent future optimization, 
combining server-side 
caching with a configurable request routing strategy. For the current FLIP, the 
stateless design remains 
the right starting point for its simplicity and broad applicability.


2. Cancelling in-flight RPC requests on data-plane failure
This is an excellent suggestion. Upon data-plane failure, the framework can 
indeed cancel requests that 
are still **queued** on the service side. The mechanism works as follows: when 
a data-plane operator is 
about to restart, it sends a cancellation signal to the service endpoints; upon 
receiving this signal, the 
service removes the requests associated with that client to avoid unnecessary 
computation.
It is worth noting that requests **already being processed** cannot be 
effectively aborted. Cancelling the 
potentially large backlog of queued requests is where the real value lies. I 
will include this cancellation 
mechanism in the FLIP as part of the proposed design.


3. Data-plane behavior when an RPC service subtask fails
There is a fast-failure mechanism in place. The underlying transport layer 
typically detects connection 
failures quickly — for example, the Pekko-based implementation is TCP-based, so 
when a service instance 
fails and the connection drops, pending RPC calls on the client side receive an 
exception without waiting 
for a timeout. The client can catch this exception and retry the request 
against other surviving instances.
When the service has only one instance (`parallelism = 1`) and that instance 
fails, the `ServiceRegistry` will 
have no available endpoints. In this case, `RosClient` will **fail fast** with 
an `UNAVAILABLE` error rather 
than blocking. The data-plane operator can then decide how to handle this based 
on its own error handling 
logic.


4. Resource utilization concerns with separate SlotSharingGroup
This is indeed a fundamental issue at the Flink resource model level. 
Currently, Flink does not natively 
support heterogeneous TaskManager specifications within a single cluster. That 
said, it is worth clarifying 
that slots with different resource profiles **can** coexist on the same 
TaskManager through Flink's 
[fine-grained resource 
management](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/).
 
However, the practical challenge is **resource fragmentation** — heterogeneous 
slots on a uniform TM 
template can lead to underutilized resources.
I believe that supporting more flexible heterogeneous TM specifications is an 
important direction worth 
pursuing. We plan to address this as a key topic in a dedicated FLIP focused on 
GPU resource scheduling, 
where we can give it the thorough treatment it deserves — including 
heterogeneous TM templates, 
GPU-aware placement, and node pool strategies.



Thank you again for these thoughtful and insightful questions!


Best,
Yi



At 2026-05-28 19:34:23, "Hongshun Wang" <[email protected]> wrote:
>Hi Yi,
>
>Thanks for the great work on FLIP-582. I've been reading through the
>proposal and have a few questions regarding the failover/recovery semantics
>and resource efficiency of RpcOperator Service. I'd appreciate any
>clarification.
>
>
>1. Cost of recomputation due to stateless design
>
>The proposal states that "RPC services do not participate in
>checkpointing." This means that upon any data-plane failure and checkpoint
>rollback, all in-flight RPC requests are lost and must be re-triggered from
>scratch. For workloads where a single RPC invocation is expensive (e.g.,
>LLM inference taking seconds to minutes per request), this could lead to
>significant wasted computation.
>
>Has there been any consideration of a lightweight mechanism to avoid
>redundant recomputation — for example, a result cache on the service side
>keyed by request identity, or an optional "resumable request" protocol? Or
>is the position that this cost is acceptable given the target workload
>characteristics?
>
>
>2. Cancelling in-flight RPC requests on data-plane failure
>
>When a data-plane operator fails and its Pipelined Region restarts, there
>may still be RPC requests in progress on the service side that were issued
>by the now-restarting data-plane tasks. Since those results will be
>discarded anyway (the data plane will re-issue them after recovery),
>continuing to process them wastes resources — particularly expensive GPU
>cycles.
>
>Is there a plan to provide an interface or mechanism for the data plane (or
>the framework) to proactively cancel outstanding RPC requests upon
>failure/restart? For example, a cancellation signal propagated from the
>RosClient to the RPC service, or a request-scoping mechanism tied to the
>checkpoint epoch, so that the service side can abort stale requests early.
>
>
>3. Data-plane behavior when an RPC service subtask fails
>
>The proposal mentions that when an RPC service subtask fails, "requests are
>routed to other surviving instances." However, I'm curious about the
>behavior for requests that are already in-flight to the failed instance:
>
>   - Does the data-plane operator have to wait for a timeout (as
>   configured) before it knows the request has failed?
>   - Is there any fast-failure notification mechanism (e.g.,
>   connection-closed detection) that allows the RosClient to immediately fail
>   over in-flight requests to other instances without waiting for the full
>   timeout?
>   - If the service has only one instance (parallelism = 1) and it fails,
>   does the data plane simply block (with backpressure) until the instance
>   recovers, or does it fail fast with UNAVAILABLE errors?
>
>A fast-failure path would be important for reducing end-to-end latency
>impact during service-side failures.
>
>
>4. Resource utilization concerns with separate SlotSharingGroup
>
>The proposal states that "each RPC service automatically creates a separate
>SlotSharingGroup" and that this is "fully compatible with Flink's existing
>K8s/YARN resource management." However, in Flink's current resource model,
>all TaskManagers share the same configuration template (determined by
>global taskmanager.memory.* and CPU settings). Fine-Grained Resource
>Management controls how slots are sized within a TM, but does not enable
>heterogeneous TM specifications across the cluster.
>
>This raises a practical concern: if an RpcOperator requires GPU resources,
>the TM configuration must include GPU allocation. Since all TMs share the
>same template, this means every TM in the cluster would be provisioned with
>GPUs — even those exclusively hosting data-plane tasks that have no GPU
>requirement. This leads to significant GPU resource waste.
>
>Furthermore, the separate SlotSharingGroup design prevents co-locating
>RpcOperator and data-plane tasks on the same TaskManager. In practice,
>co-location (e.g., RpcOperator using the GPU while data-plane tasks use the
>CPU on the same GPU node) would improve overall resource utilization. The
>current design prioritizes isolation at the cost of efficiency.
>
>Has there been consideration of:
>
>   - Supporting heterogeneous TaskManager specifications (different TM
>   templates for different SlotSharingGroups) to avoid provisioning GPU on
>   CPU-only TMs?(As a reference, KubeRay's Worker Group model addresses
>   exactly this problem for Ray clusters. In KubeRay, a RayCluster defines
>   multiple workerGroupSpecs, each with its own pod template, resource
>   configuration, node selectors, and independent autoscaling bounds)
>   - Allowing optional co-location of RpcOperator and data-plane slots on
>   the same TM (in separate slots) for scenarios where utilization is
>   prioritized over strict process-level isolation?
>   - Or is there a dependency on other sub-FLIPs under FLIP-577 to address
>   this gap in GPU resource scheduling?
>
>Thanks in advance for the clarification. Looking forward to the continued
>discussion.
>
>
>Best regards
>
>Hongshun
>
>On Thu, May 28, 2026 at 4:40 PM Guowei Ma <[email protected]> wrote:
>
>> HI,YI
>>
>> One more question I'd like to raise is around how the service behaves when
>> it's referenced by more than one job. If two jobs are submitted within the
>> same application and both reference the same RpcOperator service, is that
>> service shared across them, and what is the expected behavior in that case?
>> I'm also curious how this plays out for bounded (batch) jobs in particular:
>> if one job completes while the other is still running, what does the
>> service lifecycle look like — is the service torn down as soon as the first
>> job finishes, and if so, how does that affect the job that's still running?
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, May 28, 2026 at 2:55 PM Guowei Ma <[email protected]> wrote:
>>
>> > Hi,YI
>> >
>> > Thanks for putting together this FLIP — it's a well-thought-out proposal,
>> > and the RpcOperator Service is a really valuable addition for
>> AI/inference
>> > workloads on Flink. I have a few questions and suggestions:
>> >
>> >
>> > 1. Autoscaling
>> > How is scale-up/scale-down expressed in this design? Could you consider
>> > allowing concurrency to be configured as a (min, max) range? Currently it
>> > appears only min = max (i.e. fixed parallelism) is supported.
>> >
>> >
>> > 2. Async invocation & end-to-end examples
>> > The example shows a synchronous invocation style. I'd suggest also
>> > providing an asynchronous invocation example — or even just a scaffold
>> that
>> > exposes only the RpcOperator's handle and method. In addition, it would
>> be
>> > best to include a complete end-to-end example of invoking a model (CPU
>> > inference would be fine).
>> >
>> >
>> > 3. GPU as a resource
>> > GPU is surely a common/first-class resource. Accessing it here through an
>> > extend mechanism — wouldn't that mislead users? It would seem more
>> > natural to declare GPU as a standard resource rather than going through
>> an
>> > extension.
>> >
>> >
>> > 4. Serial method invocation
>> > Each method of an RpcOperator is invoked serially. I'd suggest stating
>> > this explicitly in both the documentation and the code comments, so that
>> > users — or an AI — can be clearly aware of it.
>> >
>> >
>> > 5. Why synchronous methods?
>> > Why is every method in RpcOperator a synchronous call? Does this imply
>> > that no model inference supports concurrent invocation?
>> >
>> >
>> > 6. SQL UDF usage
>> > How would SQL UDF users invoke this capability? I'd suggest providing a
>> > complete example for this as well.
>> >
>> >
>> > Thanks again for the great work!
>> >
>> > Best,
>> > Guowei
>> >
>> >
>> > On Wed, May 27, 2026 at 8:36 PM Charles Zhang <[email protected]>
>> > wrote:
>> >
>> >> Hi Yi, and Flink Community,
>> >>
>> >> Thanks for bringing up this excellent proposal. I am fully in favor of
>> >> FLIP-582.
>> >>
>> >> In our production workloads, especially regarding large-scale AI
>> >> inference,
>> >> the tight coupling of the data plane and compute units has always been a
>> >> major pain point. If an inference subtask fails due to a GPU OOM or
>> driver
>> >> issue, triggering a global rollback is incredibly expensive since model
>> >> reloading takes minutes.
>> >>
>> >> The introduction of the RpcOperator Service as a first-class primitive
>> >> masterfully decouples the heavy inference tasks from the mainstream
>> >> topology. The fault isolation, independent scaling, and stateless design
>> >> perfectly match the requirements of modern AI-oriented data processing.
>> >>
>> >> This is a clean and robust architecture. Looking forward to seeing this
>> >> merged!
>> >>
>> >>
>> >> Best wishes,
>> >> Charles Zhang
>> >> from Apache InLong
>> >>
>> >>
>> >> Yi Zhang <[email protected]> 于2026年5月27日周三 14:12写道:
>> >>
>> >> > Hi everyone,
>> >> >
>> >> >
>> >> >
>> >> > I would like to start a discussion on FLIP-582: Support RpcOperator
>> >> > Service [1].
>> >> >
>> >> >
>> >> > AI-oriented workloads like multimodal data processing and model
>> >> inference
>> >> > are
>> >> > growing rapidly in recent years. These workloads are characterized by
>> >> > expensive
>> >> > resources (GPUs) and high initialization costs (seconds to minutes for
>> >> > model
>> >> > loading). In today's Flink, embedding them in the data plane couples
>> >> their
>> >> > parallelism and failover with surrounding operators; deploying them as
>> >> > external
>> >> > services disconnects their lifecycle from the job and doubles
>> >> operational
>> >> > overhead.
>> >> >
>> >> >
>> >> > This FLIP introduces RpcOperator Service — a framework-level primitive
>> >> > that runs
>> >> > user-defined compute as RPC services in an independent Pipelined
>> Region
>> >> > within
>> >> > the Flink job. Because the service is isolated at the scheduling
>> level,
>> >> it
>> >> > can achieve
>> >> > fault isolation, independent scaling, and dedicated resource
>> allocation.
>> >> > As a native
>> >> > Flink primitive, it also lays the foundation for automatic flow
>> control,
>> >> > flexible load
>> >> > balancing, and coordinated auto-scaling — all without introducing
>> >> external
>> >> > infrastructure or additional operational burden.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > Looking forward to your feedback and suggestions!
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > [1]
>> >> >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-582%3A+Support+RpcOperator+Service
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > Best Regards,
>> >> > Yi Zhang
>> >>
>> >
>>

Reply via email to