[ 
https://issues.apache.org/jira/browse/FLINK-38825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

featzhang updated FLINK-38825:
------------------------------
    Description: 
h4. Background

Apache Flink currently provides {{AsyncFunction}} and {{AsyncWaitOperator}} for 
record-level asynchronous I/O.
While this model works well for traditional lookup-style workloads, it does not 
align well with {*}modern AI / ML inference and high-latency external 
services{*}, which typically prefer *batch-based execution* and require tighter 
control over latency, batching, and concurrency.

Typical examples include:
 * GPU-based model inference where batching significantly improves throughput
 * External inference or embedding services exposing batch APIs
 * RPC / database systems with high per-request overhead

To address this gap, this issue introduces a *batch-oriented async processing 
foundation* in the DataStream API.
----
h4. What has been implemented

This issue has been implemented incrementally via {*}7 focused pull 
requests{*}, providing a complete and reviewable initial solution:
 # *New public API*
 ** Introduced {{AsyncBatchFunction<IN, OUT>}} ({{{}@PublicEvolving{}}})
 ** Enables users to perform async I/O over a _batch_ of input records
 # *New runtime operator*
 ** Added {{AsyncBatchWaitOperator}} with *unordered semantics*
 ** Supports *size-based batch triggering*
 ** Supports *time-based batch triggering*
 ** Flushes remaining records on end-of-input
 ** Preserves existing async failure semantics
 # *Stream API entry point*
 ** Added {{AsyncDataStream.unorderedWaitBatch(...)}}
 ** Fully additive and consistent with existing async APIs
 # *Robust test coverage*
 ** Batch size triggering
 ** Batch time triggering
 ** Correct result emission
 ** Exception propagation and failure handling
 # *Incremental and review-friendly design*
 ** Implementation intentionally split into multiple PRs
 ** Each PR focuses on a single concern (API, operator, time trigger, tests, 
etc.)
 ** No changes to existing async APIs or behavior

----
h4. Current scope and guarantees
 * Fully backward-compatible
 * No changes to {{AsyncFunction}} or {{AsyncWaitOperator}}
 * Opt-in, additive API only
 * Designed as a minimal but extensible foundation

This implementation already enables *practical batch-based inference pipelines* 
in Flink with significantly reduced boilerplate compared to record-level async 
I/O.
----
h4. What is intentionally NOT included (follow-up work)

The following items are *explicitly out of scope for the initial 
implementation* and can be addressed incrementally in follow-up issues or PRs:
 * Ordered batch semantics
 * Event-time–based batching
 * Retry / timeout / fallback strategies
 * Batch-level concurrency controls
 * Inference-specific metrics and observability
 * SQL / Table API / Python API integration

----
h4. Summary

This issue is no longer a pure proposal:
it now provides a *working, tested, and extensible async batch processing 
primitive* in Flink, suitable for AI inference and other high-latency 
batch-oriented workloads, while keeping the core async API stable and 
backward-compatible.

  was:
h2. Background

Apache Flink provides the {{AsyncFunction}} API and the corresponding 
{{AsyncWaitOperator}} to support asynchronous I/O operations. This abstraction 
works well for traditional request–response use cases such as external lookups.

However, emerging AI inference workloads (e.g. model inference, embedding 
generation, feature enrichment) exhibit different characteristics:
 * High and variable latency (tens to hundreds of milliseconds)
 * Strong preference for batch-based execution
 * Sensitivity to concurrency limits and backpressure
 * Need for observability at inference-level semantics (latency, batch size, 
inflight requests)

The current record-based {{AsyncFunction}} abstraction does not naturally fit 
these workloads.
----
h2. Problem

The existing Async I/O mechanism has several limitations when applied to AI 
inference scenarios:
 # *Record-based invocation only*
Each input record triggers an individual asynchronous call, while most 
inference services are optimized for batch execution.
 # *Limited concurrency and backpressure semantics*
The {{capacity}} parameter primarily limits queue size, but does not express 
inference-level concurrency or batch-level flow control.
 # *Lack of batch-level lifecycle control*
Timeout, retry, and fallback logic must be implemented repeatedly by users, 
leading to duplicated and error-prone implementations.
 # *Insufficient observability for inference workloads*
Metrics such as batch size, inference latency percentiles, and inflight 
requests are not available.

These limitations make it difficult to use Flink as a first-class streaming 
engine for AI inference pipelines.
----
h2. Proposal

Introduce a new *batch-oriented asynchronous operator abstraction* that 
complements the existing {{AsyncFunction}} API and is optimized for 
high-latency inference workloads.
h3. Key ideas
 * Preserve backward compatibility with {{AsyncFunction}}
 * Add a new optional abstraction for batch-based async execution
 * Provide a reference operator implementation at the runtime level

h3. Proposed API (illustrative)

 
{code:java}
// code placeholder
public interface AsyncBatchFunction<IN, OUT> {
  void asyncInvokeBatch(List<IN> inputs,ResultFuture<OUT> resultFuture);
} {code}
h3. Proposed operator
 * {{AsyncBatchWaitOperator}}
 * Buffers incoming records and triggers async calls based on:
 ** batch size
 ** batch time
 * Controls inflight batch concurrency
 * Emits results in unordered mode initially

----
h2. Expected Benefits
 * Improved throughput and resource efficiency for inference workloads
 * Clearer backpressure semantics aligned with model serving constraints
 * Reduced boilerplate for users implementing inference pipelines
 * A solid foundation for future AI-oriented extensions (retry policies, 
fallback strategies, richer metrics)

----
h2. Scope and Compatibility
 * This proposal does *not* modify or deprecate {{AsyncFunction}}
 * The new abstraction is fully optional and additive
 * No changes to existing user code are required

----
h2. Follow-ups (Out of Scope)
 * Ordered batch async processing
 * Retry and fallback policies
 * SQL-level inference integration
 * Python operator support

These topics can be addressed incrementally in follow-up issues.
----
*Attachments / References*
 * Related code: {{{}AsyncWaitOperator{}}}, {{AsyncFunction}}
 * Motivation: AI inference and model serving workloads in streaming pipelines


> Introduce an AI-friendly Async Batch Operator for high-latency inference 
> workloads
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-38825
>                 URL: https://issues.apache.org/jira/browse/FLINK-38825
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: featzhang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.0-preview
>
>
> h4. Background
> Apache Flink currently provides {{AsyncFunction}} and {{AsyncWaitOperator}} 
> for record-level asynchronous I/O.
> While this model works well for traditional lookup-style workloads, it does 
> not align well with {*}modern AI / ML inference and high-latency external 
> services{*}, which typically prefer *batch-based execution* and require 
> tighter control over latency, batching, and concurrency.
> Typical examples include:
>  * GPU-based model inference where batching significantly improves throughput
>  * External inference or embedding services exposing batch APIs
>  * RPC / database systems with high per-request overhead
> To address this gap, this issue introduces a *batch-oriented async processing 
> foundation* in the DataStream API.
> ----
> h4. What has been implemented
> This issue has been implemented incrementally via {*}7 focused pull 
> requests{*}, providing a complete and reviewable initial solution:
>  # *New public API*
>  ** Introduced {{AsyncBatchFunction<IN, OUT>}} ({{{}@PublicEvolving{}}})
>  ** Enables users to perform async I/O over a _batch_ of input records
>  # *New runtime operator*
>  ** Added {{AsyncBatchWaitOperator}} with *unordered semantics*
>  ** Supports *size-based batch triggering*
>  ** Supports *time-based batch triggering*
>  ** Flushes remaining records on end-of-input
>  ** Preserves existing async failure semantics
>  # *Stream API entry point*
>  ** Added {{AsyncDataStream.unorderedWaitBatch(...)}}
>  ** Fully additive and consistent with existing async APIs
>  # *Robust test coverage*
>  ** Batch size triggering
>  ** Batch time triggering
>  ** Correct result emission
>  ** Exception propagation and failure handling
>  # *Incremental and review-friendly design*
>  ** Implementation intentionally split into multiple PRs
>  ** Each PR focuses on a single concern (API, operator, time trigger, tests, 
> etc.)
>  ** No changes to existing async APIs or behavior
> ----
> h4. Current scope and guarantees
>  * Fully backward-compatible
>  * No changes to {{AsyncFunction}} or {{AsyncWaitOperator}}
>  * Opt-in, additive API only
>  * Designed as a minimal but extensible foundation
> This implementation already enables *practical batch-based inference 
> pipelines* in Flink with significantly reduced boilerplate compared to 
> record-level async I/O.
> ----
> h4. What is intentionally NOT included (follow-up work)
> The following items are *explicitly out of scope for the initial 
> implementation* and can be addressed incrementally in follow-up issues or PRs:
>  * Ordered batch semantics
>  * Event-time–based batching
>  * Retry / timeout / fallback strategies
>  * Batch-level concurrency controls
>  * Inference-specific metrics and observability
>  * SQL / Table API / Python API integration
> ----
> h4. Summary
> This issue is no longer a pure proposal:
> it now provides a *working, tested, and extensible async batch processing 
> primitive* in Flink, suitable for AI inference and other high-latency 
> batch-oriented workloads, while keeping the core async API stable and 
> backward-compatible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to