[ 
https://issues.apache.org/jira/browse/FLINK-38825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

featzhang updated FLINK-38825:
------------------------------
    Description: 
h2. Background

Apache Flink provides the {{AsyncFunction}} API and the corresponding 
{{AsyncWaitOperator}} to support asynchronous I/O operations. This abstraction 
works well for traditional request–response use cases such as external lookups.

However, emerging AI inference workloads (e.g. model inference, embedding 
generation, feature enrichment) exhibit different characteristics:
 * High and variable latency (tens to hundreds of milliseconds)
 * Strong preference for batch-based execution
 * Sensitivity to concurrency limits and backpressure
 * Need for observability at inference-level semantics (latency, batch size, 
inflight requests)

The current record-based {{AsyncFunction}} abstraction does not naturally fit 
these workloads.
----
h2. Problem

The existing Async I/O mechanism has several limitations when applied to AI 
inference scenarios:
 # *Record-based invocation only*
Each input record triggers an individual asynchronous call, while most 
inference services are optimized for batch execution.
 # *Limited concurrency and backpressure semantics*
The {{capacity}} parameter primarily limits queue size, but does not express 
inference-level concurrency or batch-level flow control.
 # *Lack of batch-level lifecycle control*
Timeout, retry, and fallback logic must be implemented repeatedly by users, 
leading to duplicated and error-prone implementations.
 # *Insufficient observability for inference workloads*
Metrics such as batch size, inference latency percentiles, and inflight 
requests are not available.

These limitations make it difficult to use Flink as a first-class streaming 
engine for AI inference pipelines.
----
h2. Proposal

Introduce a new *batch-oriented asynchronous operator abstraction* that 
complements the existing {{AsyncFunction}} API and is optimized for 
high-latency inference workloads.
h3. Key ideas
 * Preserve backward compatibility with {{AsyncFunction}}
 * Add a new optional abstraction for batch-based async execution
 * Provide a reference operator implementation at the runtime level

h3. Proposed API (illustrative)

 
{code:java}
// code placeholder
public interface AsyncBatchFunction<IN, OUT> {
  void asyncInvokeBatch(List<IN> inputs,ResultFuture<OUT> resultFuture);
} {code}
h3. Proposed operator
 * {{AsyncBatchWaitOperator}}
 * Buffers incoming records and triggers async calls based on:
 ** batch size
 ** batch time
 * Controls inflight batch concurrency
 * Emits results in unordered mode initially

----
h2. Expected Benefits
 * Improved throughput and resource efficiency for inference workloads
 * Clearer backpressure semantics aligned with model serving constraints
 * Reduced boilerplate for users implementing inference pipelines
 * A solid foundation for future AI-oriented extensions (retry policies, 
fallback strategies, richer metrics)

----
h2. Scope and Compatibility
 * This proposal does *not* modify or deprecate {{AsyncFunction}}
 * The new abstraction is fully optional and additive
 * No changes to existing user code are required

----
h2. Follow-ups (Out of Scope)
 * Ordered batch async processing
 * Retry and fallback policies
 * SQL-level inference integration
 * Python operator support

These topics can be addressed incrementally in follow-up issues.
----
*Attachments / References*
 * Related code: {{{}AsyncWaitOperator{}}}, {{AsyncFunction}}
 * Motivation: AI inference and model serving workloads in streaming pipelines

  was:
h2. Background

Apache Flink provides the {{AsyncFunction}} API and the corresponding 
{{AsyncWaitOperator}} to support asynchronous I/O operations. This abstraction 
works well for traditional request–response use cases such as external lookups.

However, emerging AI inference workloads (e.g. model inference, embedding 
generation, feature enrichment) exhibit different characteristics:
 * High and variable latency (tens to hundreds of milliseconds)
 * Strong preference for batch-based execution
 * Sensitivity to concurrency limits and backpressure
 * Need for observability at inference-level semantics (latency, batch size, 
inflight requests)

The current record-based {{AsyncFunction}} abstraction does not naturally fit 
these workloads.
----
h2. Problem

The existing Async I/O mechanism has several limitations when applied to AI 
inference scenarios:
 # *Record-based invocation only*
Each input record triggers an individual asynchronous call, while most 
inference services are optimized for batch execution.
 # *Limited concurrency and backpressure semantics*
The {{capacity}} parameter primarily limits queue size, but does not express 
inference-level concurrency or batch-level flow control.
 # *Lack of batch-level lifecycle control*
Timeout, retry, and fallback logic must be implemented repeatedly by users, 
leading to duplicated and error-prone implementations.
 # *Insufficient observability for inference workloads*
Metrics such as batch size, inference latency percentiles, and inflight 
requests are not available.

These limitations make it difficult to use Flink as a first-class streaming 
engine for AI inference pipelines.
----
h2. Proposal

Introduce a new *batch-oriented asynchronous operator abstraction* that 
complements the existing {{AsyncFunction}} API and is optimized for 
high-latency inference workloads.
h3. Key ideas
 * Preserve backward compatibility with {{AsyncFunction}}
 * Add a new optional abstraction for batch-based async execution
 * Provide a reference operator implementation at the runtime level

h3. Proposed API (illustrative)

 

{{public interface AsyncBatchFunction<IN, OUT> \{

    void asyncInvokeBatch(
        List<IN> inputs,
        ResultFuture<OUT> resultFuture);
}}}
h3. Proposed operator
 * {{AsyncBatchWaitOperator}}
 * Buffers incoming records and triggers async calls based on:
 ** batch size
 ** batch time
 * Controls inflight batch concurrency
 * Emits results in unordered mode initially

----
h2. Expected Benefits
 * Improved throughput and resource efficiency for inference workloads
 * Clearer backpressure semantics aligned with model serving constraints
 * Reduced boilerplate for users implementing inference pipelines
 * A solid foundation for future AI-oriented extensions (retry policies, 
fallback strategies, richer metrics)

----
h2. Scope and Compatibility
 * This proposal does *not* modify or deprecate {{AsyncFunction}}
 * The new abstraction is fully optional and additive
 * No changes to existing user code are required

----
h2. Follow-ups (Out of Scope)
 * Ordered batch async processing
 * Retry and fallback policies
 * SQL-level inference integration
 * Python operator support

These topics can be addressed incrementally in follow-up issues.
----
*Attachments / References*
 * Related code: {{{}AsyncWaitOperator{}}}, {{AsyncFunction}}
 * Motivation: AI inference and model serving workloads in streaming pipelines


> Introduce an AI-friendly Async Batch Operator for high-latency inference 
> workloads
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-38825
>                 URL: https://issues.apache.org/jira/browse/FLINK-38825
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: featzhang
>            Priority: Major
>             Fix For: 2.0-preview
>
>
> h2. Background
> Apache Flink provides the {{AsyncFunction}} API and the corresponding 
> {{AsyncWaitOperator}} to support asynchronous I/O operations. This 
> abstraction works well for traditional request–response use cases such as 
> external lookups.
> However, emerging AI inference workloads (e.g. model inference, embedding 
> generation, feature enrichment) exhibit different characteristics:
>  * High and variable latency (tens to hundreds of milliseconds)
>  * Strong preference for batch-based execution
>  * Sensitivity to concurrency limits and backpressure
>  * Need for observability at inference-level semantics (latency, batch size, 
> inflight requests)
> The current record-based {{AsyncFunction}} abstraction does not naturally fit 
> these workloads.
> ----
> h2. Problem
> The existing Async I/O mechanism has several limitations when applied to AI 
> inference scenarios:
>  # *Record-based invocation only*
> Each input record triggers an individual asynchronous call, while most 
> inference services are optimized for batch execution.
>  # *Limited concurrency and backpressure semantics*
> The {{capacity}} parameter primarily limits queue size, but does not express 
> inference-level concurrency or batch-level flow control.
>  # *Lack of batch-level lifecycle control*
> Timeout, retry, and fallback logic must be implemented repeatedly by users, 
> leading to duplicated and error-prone implementations.
>  # *Insufficient observability for inference workloads*
> Metrics such as batch size, inference latency percentiles, and inflight 
> requests are not available.
> These limitations make it difficult to use Flink as a first-class streaming 
> engine for AI inference pipelines.
> ----
> h2. Proposal
> Introduce a new *batch-oriented asynchronous operator abstraction* that 
> complements the existing {{AsyncFunction}} API and is optimized for 
> high-latency inference workloads.
> h3. Key ideas
>  * Preserve backward compatibility with {{AsyncFunction}}
>  * Add a new optional abstraction for batch-based async execution
>  * Provide a reference operator implementation at the runtime level
> h3. Proposed API (illustrative)
>  
> {code:java}
> // code placeholder
> public interface AsyncBatchFunction<IN, OUT> {
>   void asyncInvokeBatch(List<IN> inputs,ResultFuture<OUT> resultFuture);
> } {code}
> h3. Proposed operator
>  * {{AsyncBatchWaitOperator}}
>  * Buffers incoming records and triggers async calls based on:
>  ** batch size
>  ** batch time
>  * Controls inflight batch concurrency
>  * Emits results in unordered mode initially
> ----
> h2. Expected Benefits
>  * Improved throughput and resource efficiency for inference workloads
>  * Clearer backpressure semantics aligned with model serving constraints
>  * Reduced boilerplate for users implementing inference pipelines
>  * A solid foundation for future AI-oriented extensions (retry policies, 
> fallback strategies, richer metrics)
> ----
> h2. Scope and Compatibility
>  * This proposal does *not* modify or deprecate {{AsyncFunction}}
>  * The new abstraction is fully optional and additive
>  * No changes to existing user code are required
> ----
> h2. Follow-ups (Out of Scope)
>  * Ordered batch async processing
>  * Retry and fallback policies
>  * SQL-level inference integration
>  * Python operator support
> These topics can be addressed incrementally in follow-up issues.
> ----
> *Attachments / References*
>  * Related code: {{{}AsyncWaitOperator{}}}, {{AsyncFunction}}
>  * Motivation: AI inference and model serving workloads in streaming pipelines



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to