featzhang opened a new pull request, #27355:
URL: https://github.com/apache/flink/pull/27355

   ### Motivation
   
   Flink’s existing `AsyncFunction` processes records one-by-one, which is not 
optimal for **high-latency inference workloads** such as machine learning model 
serving, where **batching requests** can significantly improve throughput and 
resource utilization (e.g. GPU/remote inference services).
   
   Currently, users have to implement batching logic outside of Flink or embed 
complex buffering logic inside a single-record `AsyncFunction`, which is 
error-prone and hard to evolve.
   
   This PR introduces a **minimal, batch-oriented async abstraction** as a 
foundation for future AI / inference-related extensions.
   
   ---
   
   ### What is proposed in this PR
   
   This PR adds a **small, additive, and backward-compatible** set of building 
blocks:
   
   1. **`AsyncBatchFunction` (PublicEvolving API)**  
      A new async function interface that allows processing a *batch of input 
elements* in a single async invocation.
   
   2. **`AsyncBatchWaitOperator` (runtime operator)**  
      A minimal unordered async batch operator that:
      - Buffers incoming records
      - Triggers async invocation when `maxBatchSize` is reached
      - Emits results as soon as the async call completes
      - Flushes remaining elements on end-of-input
   
   3. **Stream API entry point**  
      A new `AsyncDataStream.unorderedWaitBatch(...)` method to wire the 
operator in the DataStream API.
   
   4. **Unit tests**  
      Tests validating batch triggering, result emission, and exception 
propagation.
   
   ---
   
   ### Scope and intentional limitations
   
   This PR is intentionally **minimal** and does **not** include:
   
   - Ordered output mode
   - Time-based batching or timers
   - Retry / timeout handling
   - Metrics
   - SQL / Table API integration
   - Python integration
   - Any changes to existing `AsyncFunction` or `AsyncWaitOperator`
   
   These aspects are expected to be explored incrementally in follow-up PRs 
once the core abstraction is agreed upon.
   
   ---
   
   ### Why this approach
   
   - Keeps the initial API surface small and reviewable
   - Avoids coupling with existing async internals
   - Makes future extensions (timers, retries, Python, SQL) explicit and 
incremental
   - Aligns with Flink’s philosophy of evolving APIs through small, focused 
changes
   
   ---
   
   ### Follow-up work (out of scope for this PR)
   
   Potential next steps include:
   - Ordered batch async operator
   - Time-based batch triggering
   - Async retry and timeout strategies
   - Metrics and backpressure awareness
   - Python / ML inference integrations
   - SQL/Table API exposure
   
   ---
   
   ### Compatibility
   
   - Fully backward-compatible
   - Purely additive changes
   - No behavior changes to existing async operators


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to