[
https://issues.apache.org/jira/browse/FLINK-38825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-38825:
-----------------------------------
Labels: pull-request-available (was: )
> Introduce an AI-friendly Async Batch Operator for high-latency inference
> workloads
> ----------------------------------------------------------------------------------
>
> Key: FLINK-38825
> URL: https://issues.apache.org/jira/browse/FLINK-38825
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Reporter: featzhang
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> h2. Background
> Apache Flink provides the {{AsyncFunction}} API and the corresponding
> {{AsyncWaitOperator}} to support asynchronous I/O operations. This
> abstraction works well for traditional request–response use cases such as
> external lookups.
> However, emerging AI inference workloads (e.g. model inference, embedding
> generation, feature enrichment) exhibit different characteristics:
> * High and variable latency (tens to hundreds of milliseconds)
> * Strong preference for batch-based execution
> * Sensitivity to concurrency limits and backpressure
> * Need for observability at inference-level semantics (latency, batch size,
> inflight requests)
> The current record-based {{AsyncFunction}} abstraction does not naturally fit
> these workloads.
> ----
> h2. Problem
> The existing Async I/O mechanism has several limitations when applied to AI
> inference scenarios:
> # *Record-based invocation only*
> Each input record triggers an individual asynchronous call, while most
> inference services are optimized for batch execution.
> # *Limited concurrency and backpressure semantics*
> The {{capacity}} parameter primarily limits queue size, but does not express
> inference-level concurrency or batch-level flow control.
> # *Lack of batch-level lifecycle control*
> Timeout, retry, and fallback logic must be implemented repeatedly by users,
> leading to duplicated and error-prone implementations.
> # *Insufficient observability for inference workloads*
> Metrics such as batch size, inference latency percentiles, and inflight
> requests are not available.
> These limitations make it difficult to use Flink as a first-class streaming
> engine for AI inference pipelines.
> ----
> h2. Proposal
> Introduce a new *batch-oriented asynchronous operator abstraction* that
> complements the existing {{AsyncFunction}} API and is optimized for
> high-latency inference workloads.
> h3. Key ideas
> * Preserve backward compatibility with {{AsyncFunction}}
> * Add a new optional abstraction for batch-based async execution
> * Provide a reference operator implementation at the runtime level
> h3. Proposed API (illustrative)
> Â
> {code:java}
> // code placeholder
> public interface AsyncBatchFunction<IN, OUT> {
> void asyncInvokeBatch(List<IN> inputs,ResultFuture<OUT> resultFuture);
> } {code}
> h3. Proposed operator
> * {{AsyncBatchWaitOperator}}
> * Buffers incoming records and triggers async calls based on:
> ** batch size
> ** batch time
> * Controls inflight batch concurrency
> * Emits results in unordered mode initially
> ----
> h2. Expected Benefits
> * Improved throughput and resource efficiency for inference workloads
> * Clearer backpressure semantics aligned with model serving constraints
> * Reduced boilerplate for users implementing inference pipelines
> * A solid foundation for future AI-oriented extensions (retry policies,
> fallback strategies, richer metrics)
> ----
> h2. Scope and Compatibility
> * This proposal does *not* modify or deprecate {{AsyncFunction}}
> * The new abstraction is fully optional and additive
> * No changes to existing user code are required
> ----
> h2. Follow-ups (Out of Scope)
> * Ordered batch async processing
> * Retry and fallback policies
> * SQL-level inference integration
> * Python operator support
> These topics can be addressed incrementally in follow-up issues.
> ----
> *Attachments / References*
> * Related code: {{{}AsyncWaitOperator{}}}, {{AsyncFunction}}
> * Motivation: AI inference and model serving workloads in streaming pipelines
--
This message was sent by Atlassian Jira
(v8.20.10#820010)