featzhang created FLINK-38825:
---------------------------------
Summary: Introduce an AI-friendly Async Batch Operator for
high-latency inference workloads
Key: FLINK-38825
URL: https://issues.apache.org/jira/browse/FLINK-38825
Project: Flink
Issue Type: Improvement
Components: Runtime / Task
Reporter: featzhang
Fix For: 2.0-preview
h2. Background
Apache Flink provides the {{AsyncFunction}} API and the corresponding
{{AsyncWaitOperator}} to support asynchronous I/O operations. This abstraction
works well for traditional request–response use cases such as external lookups.
However, emerging AI inference workloads (e.g. model inference, embedding
generation, feature enrichment) exhibit different characteristics:
* High and variable latency (tens to hundreds of milliseconds)
* Strong preference for batch-based execution
* Sensitivity to concurrency limits and backpressure
* Need for observability at inference-level semantics (latency, batch size,
inflight requests)
The current record-based {{AsyncFunction}} abstraction does not naturally fit
these workloads.
----
h2. Problem
The existing Async I/O mechanism has several limitations when applied to AI
inference scenarios:
# *Record-based invocation only*
Each input record triggers an individual asynchronous call, while most
inference services are optimized for batch execution.
# *Limited concurrency and backpressure semantics*
The {{capacity}} parameter primarily limits queue size, but does not express
inference-level concurrency or batch-level flow control.
# *Lack of batch-level lifecycle control*
Timeout, retry, and fallback logic must be implemented repeatedly by users,
leading to duplicated and error-prone implementations.
# *Insufficient observability for inference workloads*
Metrics such as batch size, inference latency percentiles, and inflight
requests are not available.
These limitations make it difficult to use Flink as a first-class streaming
engine for AI inference pipelines.
----
h2. Proposal
Introduce a new *batch-oriented asynchronous operator abstraction* that
complements the existing {{AsyncFunction}} API and is optimized for
high-latency inference workloads.
h3. Key ideas
* Preserve backward compatibility with {{AsyncFunction}}
* Add a new optional abstraction for batch-based async execution
* Provide a reference operator implementation at the runtime level
h3. Proposed API (illustrative)
Â
{{public interface AsyncBatchFunction<IN, OUT> \{
void asyncInvokeBatch(
List<IN> inputs,
ResultFuture<OUT> resultFuture);
}}}
h3. Proposed operator
* {{AsyncBatchWaitOperator}}
* Buffers incoming records and triggers async calls based on:
** batch size
** batch time
* Controls inflight batch concurrency
* Emits results in unordered mode initially
----
h2. Expected Benefits
* Improved throughput and resource efficiency for inference workloads
* Clearer backpressure semantics aligned with model serving constraints
* Reduced boilerplate for users implementing inference pipelines
* A solid foundation for future AI-oriented extensions (retry policies,
fallback strategies, richer metrics)
----
h2. Scope and Compatibility
* This proposal does *not* modify or deprecate {{AsyncFunction}}
* The new abstraction is fully optional and additive
* No changes to existing user code are required
----
h2. Follow-ups (Out of Scope)
* Ordered batch async processing
* Retry and fallback policies
* SQL-level inference integration
* Python operator support
These topics can be addressed incrementally in follow-up issues.
----
*Attachments / References*
* Related code: {{{}AsyncWaitOperator{}}}, {{AsyncFunction}}
* Motivation: AI inference and model serving workloads in streaming pipelines
--
This message was sent by Atlassian Jira
(v8.20.10#820010)