featzhang opened a new pull request, #27356:
URL: https://github.com/apache/flink/pull/27356

   ### [FLINK-38825] Introduce AsyncBatchFunction and AsyncBatchWaitOperator 
(initial API & size-based batching)
   
   #### Motivation
   
   Many AI / ML inference and external service workloads benefit significantly 
from **batching** rather than per-record async invocation. Typical examples 
include:
   
   * GPU-based model inference where batching improves throughput and 
utilization
   * External services that expose batch APIs
   * Databases or RPC systems with high per-request overhead
   
   While Flink currently provides `AsyncFunction` and `AsyncWaitOperator` for 
record-level async I/O, there is no native abstraction for **batch-oriented 
async processing** in the DataStream API.
   
   This PR introduces a **minimal and extensible foundation** for async batch 
processing in Flink.
   
   ---
   
   #### What is included in this PR
   
   This PR intentionally focuses on a **small, additive, and reviewer-friendly 
first step**:
   
   1. **New public API**
   
      * Introduces `AsyncBatchFunction<IN, OUT>` (annotated with 
`@PublicEvolving`)
      * Allows users to implement async I/O over a batch of inputs
   
   2. **New runtime operator**
   
      * Adds `AsyncBatchWaitOperator` with **unordered semantics**
      * Supports **size-based batching only**
      * Flushes remaining records on end-of-input
      * Keeps implementation intentionally simple
   
   3. **Stream API entry point**
   
      * Adds `AsyncDataStream.unorderedWaitBatch(...)`
      * Mirrors existing async API style without modifying existing methods
   
   4. **Unit tests**
   
      * Verifies batch-size triggering behavior
      * Verifies correct result emission
      * Verifies exception propagation and failure behavior
   
   ---
   
   #### What is intentionally NOT included
   
   To keep this PR minimal and avoid premature design constraints, the 
following are **explicitly out of scope** and will be addressed in follow-up 
PRs:
   
   * Ordered batching semantics
   * Time-based or event-time-based batching
   * Retry / timeout handling at the async invocation level
   * Metrics and backpressure tuning
   * SQL / Table API / Python API integration
   * Multiple in-flight batch concurrency control
   
   ---
   
   #### Design notes
   
   * The new API is **additive and backward-compatible**; no existing behavior 
is modified.
   * The operator is designed to be easy to evolve, with clear extension points 
documented via TODOs.
   * Implementation favors explicit logic over reuse or abstraction to simplify 
review and future changes.
   
   ---
   
   #### Follow-up work
   
   Planned follow-up PRs (tracked under FLINK-38825) include:
   
   * Time-based batch triggering
   * Ordered batch mode
   * Event-time batching
   * Retry / timeout strategies
   * Metrics and observability
   * Higher-level API and ecosystem integrations
   
   ---
   
   #### Checklist
   
   * [x] API annotated as `@PublicEvolving`
   * [x] No changes to existing async APIs
   * [x] Fully covered by unit tests
   * [x] Backward-compatible and additive only
   
   This PR establishes the minimal building block for async batch processing in 
Flink and enables iterative enhancement without locking in premature design 
decisions.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to