[
https://issues.apache.org/jira/browse/FLINK-38825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
featzhang updated FLINK-38825:
------------------------------
Description:
h4. Background
Apache Flink currently provides {{AsyncFunction}} and {{AsyncWaitOperator}} for
record-level asynchronous I/O.
While this model works well for traditional lookup-style workloads, it does not
align well with {*}modern AI / ML inference and high-latency external
services{*}, which typically prefer *batch-based execution* and require tighter
control over latency, batching, and concurrency.
Typical examples include:
* GPU-based model inference where batching significantly improves throughput
* External inference or embedding services exposing batch APIs
* RPC / database systems with high per-request overhead
To address this gap, this issue introduces a *batch-oriented async processing
foundation* in the DataStream API.
----
h4. What has been implemented
This issue has been implemented incrementally via {*}7 focused pull
requests{*}, providing a complete and reviewable initial solution:
# *New public API*
** Introduced {{AsyncBatchFunction<IN, OUT>}} ({{{}@PublicEvolving{}}})
** Enables users to perform async I/O over a _batch_ of input records
# *New runtime operator*
** Added {{AsyncBatchWaitOperator}} with *unordered semantics*
** Supports *size-based batch triggering*
** Supports *time-based batch triggering*
** Flushes remaining records on end-of-input
** Preserves existing async failure semantics
# *Stream API entry point*
** Added {{AsyncDataStream.unorderedWaitBatch(...)}}
** Fully additive and consistent with existing async APIs
# *Robust test coverage*
** Batch size triggering
** Batch time triggering
** Correct result emission
** Exception propagation and failure handling
# *Incremental and review-friendly design*
** Implementation intentionally split into multiple PRs
** Each PR focuses on a single concern (API, operator, time trigger, tests,
etc.)
** No changes to existing async APIs or behavior
----
h4. Current scope and guarantees
* Fully backward-compatible
* No changes to {{AsyncFunction}} or {{AsyncWaitOperator}}
* Opt-in, additive API only
* Designed as a minimal but extensible foundation
This implementation already enables *practical batch-based inference pipelines*
in Flink with significantly reduced boilerplate compared to record-level async
I/O.
----
h4. What is intentionally NOT included (follow-up work)
The following items are *explicitly out of scope for the initial
implementation* and can be addressed incrementally in follow-up issues or PRs:
* Ordered batch semantics
* Event-time–based batching
* Retry / timeout / fallback strategies
* Batch-level concurrency controls
* Inference-specific metrics and observability
* SQL / Table API / Python API integration
----
h4. Summary
This issue is no longer a pure proposal:
it now provides a *working, tested, and extensible async batch processing
primitive* in Flink, suitable for AI inference and other high-latency
batch-oriented workloads, while keeping the core async API stable and
backward-compatible.
was:
h2. Background
Apache Flink provides the {{AsyncFunction}} API and the corresponding
{{AsyncWaitOperator}} to support asynchronous I/O operations. This abstraction
works well for traditional request–response use cases such as external lookups.
However, emerging AI inference workloads (e.g. model inference, embedding
generation, feature enrichment) exhibit different characteristics:
* High and variable latency (tens to hundreds of milliseconds)
* Strong preference for batch-based execution
* Sensitivity to concurrency limits and backpressure
* Need for observability at inference-level semantics (latency, batch size,
inflight requests)
The current record-based {{AsyncFunction}} abstraction does not naturally fit
these workloads.
----
h2. Problem
The existing Async I/O mechanism has several limitations when applied to AI
inference scenarios:
# *Record-based invocation only*
Each input record triggers an individual asynchronous call, while most
inference services are optimized for batch execution.
# *Limited concurrency and backpressure semantics*
The {{capacity}} parameter primarily limits queue size, but does not express
inference-level concurrency or batch-level flow control.
# *Lack of batch-level lifecycle control*
Timeout, retry, and fallback logic must be implemented repeatedly by users,
leading to duplicated and error-prone implementations.
# *Insufficient observability for inference workloads*
Metrics such as batch size, inference latency percentiles, and inflight
requests are not available.
These limitations make it difficult to use Flink as a first-class streaming
engine for AI inference pipelines.
----
h2. Proposal
Introduce a new *batch-oriented asynchronous operator abstraction* that
complements the existing {{AsyncFunction}} API and is optimized for
high-latency inference workloads.
h3. Key ideas
* Preserve backward compatibility with {{AsyncFunction}}
* Add a new optional abstraction for batch-based async execution
* Provide a reference operator implementation at the runtime level
h3. Proposed API (illustrative)
{code:java}
// code placeholder
public interface AsyncBatchFunction<IN, OUT> {
void asyncInvokeBatch(List<IN> inputs,ResultFuture<OUT> resultFuture);
} {code}
h3. Proposed operator
* {{AsyncBatchWaitOperator}}
* Buffers incoming records and triggers async calls based on:
** batch size
** batch time
* Controls inflight batch concurrency
* Emits results in unordered mode initially
----
h2. Expected Benefits
* Improved throughput and resource efficiency for inference workloads
* Clearer backpressure semantics aligned with model serving constraints
* Reduced boilerplate for users implementing inference pipelines
* A solid foundation for future AI-oriented extensions (retry policies,
fallback strategies, richer metrics)
----
h2. Scope and Compatibility
* This proposal does *not* modify or deprecate {{AsyncFunction}}
* The new abstraction is fully optional and additive
* No changes to existing user code are required
----
h2. Follow-ups (Out of Scope)
* Ordered batch async processing
* Retry and fallback policies
* SQL-level inference integration
* Python operator support
These topics can be addressed incrementally in follow-up issues.
----
*Attachments / References*
* Related code: {{{}AsyncWaitOperator{}}}, {{AsyncFunction}}
* Motivation: AI inference and model serving workloads in streaming pipelines
> Introduce an AI-friendly Async Batch Operator for high-latency inference
> workloads
> ----------------------------------------------------------------------------------
>
> Key: FLINK-38825
> URL: https://issues.apache.org/jira/browse/FLINK-38825
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Reporter: featzhang
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> h4. Background
> Apache Flink currently provides {{AsyncFunction}} and {{AsyncWaitOperator}}
> for record-level asynchronous I/O.
> While this model works well for traditional lookup-style workloads, it does
> not align well with {*}modern AI / ML inference and high-latency external
> services{*}, which typically prefer *batch-based execution* and require
> tighter control over latency, batching, and concurrency.
> Typical examples include:
> * GPU-based model inference where batching significantly improves throughput
> * External inference or embedding services exposing batch APIs
> * RPC / database systems with high per-request overhead
> To address this gap, this issue introduces a *batch-oriented async processing
> foundation* in the DataStream API.
> ----
> h4. What has been implemented
> This issue has been implemented incrementally via {*}7 focused pull
> requests{*}, providing a complete and reviewable initial solution:
> # *New public API*
> ** Introduced {{AsyncBatchFunction<IN, OUT>}} ({{{}@PublicEvolving{}}})
> ** Enables users to perform async I/O over a _batch_ of input records
> # *New runtime operator*
> ** Added {{AsyncBatchWaitOperator}} with *unordered semantics*
> ** Supports *size-based batch triggering*
> ** Supports *time-based batch triggering*
> ** Flushes remaining records on end-of-input
> ** Preserves existing async failure semantics
> # *Stream API entry point*
> ** Added {{AsyncDataStream.unorderedWaitBatch(...)}}
> ** Fully additive and consistent with existing async APIs
> # *Robust test coverage*
> ** Batch size triggering
> ** Batch time triggering
> ** Correct result emission
> ** Exception propagation and failure handling
> # *Incremental and review-friendly design*
> ** Implementation intentionally split into multiple PRs
> ** Each PR focuses on a single concern (API, operator, time trigger, tests,
> etc.)
> ** No changes to existing async APIs or behavior
> ----
> h4. Current scope and guarantees
> * Fully backward-compatible
> * No changes to {{AsyncFunction}} or {{AsyncWaitOperator}}
> * Opt-in, additive API only
> * Designed as a minimal but extensible foundation
> This implementation already enables *practical batch-based inference
> pipelines* in Flink with significantly reduced boilerplate compared to
> record-level async I/O.
> ----
> h4. What is intentionally NOT included (follow-up work)
> The following items are *explicitly out of scope for the initial
> implementation* and can be addressed incrementally in follow-up issues or PRs:
> * Ordered batch semantics
> * Event-time–based batching
> * Retry / timeout / fallback strategies
> * Batch-level concurrency controls
> * Inference-specific metrics and observability
> * SQL / Table API / Python API integration
> ----
> h4. Summary
> This issue is no longer a pure proposal:
> it now provides a *working, tested, and extensible async batch processing
> primitive* in Flink, suitable for AI inference and other high-latency
> batch-oriented workloads, while keeping the core async API stable and
> backward-compatible.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)