featzhang opened a new pull request, #27356:
URL: https://github.com/apache/flink/pull/27356
### [FLINK-38825] Introduce AsyncBatchFunction and AsyncBatchWaitOperator
(initial API & size-based batching)
#### Motivation
Many AI / ML inference and external service workloads benefit significantly
from **batching** rather than per-record async invocation. Typical examples
include:
* GPU-based model inference where batching improves throughput and
utilization
* External services that expose batch APIs
* Databases or RPC systems with high per-request overhead
While Flink currently provides `AsyncFunction` and `AsyncWaitOperator` for
record-level async I/O, there is no native abstraction for **batch-oriented
async processing** in the DataStream API.
This PR introduces a **minimal and extensible foundation** for async batch
processing in Flink.
---
#### What is included in this PR
This PR intentionally focuses on a **small, additive, and reviewer-friendly
first step**:
1. **New public API**
* Introduces `AsyncBatchFunction<IN, OUT>` (annotated with
`@PublicEvolving`)
* Allows users to implement async I/O over a batch of inputs
2. **New runtime operator**
* Adds `AsyncBatchWaitOperator` with **unordered semantics**
* Supports **size-based batching only**
* Flushes remaining records on end-of-input
* Keeps implementation intentionally simple
3. **Stream API entry point**
* Adds `AsyncDataStream.unorderedWaitBatch(...)`
* Mirrors existing async API style without modifying existing methods
4. **Unit tests**
* Verifies batch-size triggering behavior
* Verifies correct result emission
* Verifies exception propagation and failure behavior
---
#### What is intentionally NOT included
To keep this PR minimal and avoid premature design constraints, the
following are **explicitly out of scope** and will be addressed in follow-up
PRs:
* Ordered batching semantics
* Time-based or event-time-based batching
* Retry / timeout handling at the async invocation level
* Metrics and backpressure tuning
* SQL / Table API / Python API integration
* Multiple in-flight batch concurrency control
---
#### Design notes
* The new API is **additive and backward-compatible**; no existing behavior
is modified.
* The operator is designed to be easy to evolve, with clear extension points
documented via TODOs.
* Implementation favors explicit logic over reuse or abstraction to simplify
review and future changes.
---
#### Follow-up work
Planned follow-up PRs (tracked under FLINK-38825) include:
* Time-based batch triggering
* Ordered batch mode
* Event-time batching
* Retry / timeout strategies
* Metrics and observability
* Higher-level API and ecosystem integrations
---
#### Checklist
* [x] API annotated as `@PublicEvolving`
* [x] No changes to existing async APIs
* [x] Fully covered by unit tests
* [x] Backward-compatible and additive only
This PR establishes the minimal building block for async batch processing in
Flink and enables iterative enhancement without locking in premature design
decisions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]