featzhang opened a new pull request, #27355:
URL: https://github.com/apache/flink/pull/27355
### Motivation
Flink’s existing `AsyncFunction` processes records one-by-one, which is not
optimal for **high-latency inference workloads** such as machine learning model
serving, where **batching requests** can significantly improve throughput and
resource utilization (e.g. GPU/remote inference services).
Currently, users have to implement batching logic outside of Flink or embed
complex buffering logic inside a single-record `AsyncFunction`, which is
error-prone and hard to evolve.
This PR introduces a **minimal, batch-oriented async abstraction** as a
foundation for future AI / inference-related extensions.
---
### What is proposed in this PR
This PR adds a **small, additive, and backward-compatible** set of building
blocks:
1. **`AsyncBatchFunction` (PublicEvolving API)**
A new async function interface that allows processing a *batch of input
elements* in a single async invocation.
2. **`AsyncBatchWaitOperator` (runtime operator)**
A minimal unordered async batch operator that:
- Buffers incoming records
- Triggers async invocation when `maxBatchSize` is reached
- Emits results as soon as the async call completes
- Flushes remaining elements on end-of-input
3. **Stream API entry point**
A new `AsyncDataStream.unorderedWaitBatch(...)` method to wire the
operator in the DataStream API.
4. **Unit tests**
Tests validating batch triggering, result emission, and exception
propagation.
---
### Scope and intentional limitations
This PR is intentionally **minimal** and does **not** include:
- Ordered output mode
- Time-based batching or timers
- Retry / timeout handling
- Metrics
- SQL / Table API integration
- Python integration
- Any changes to existing `AsyncFunction` or `AsyncWaitOperator`
These aspects are expected to be explored incrementally in follow-up PRs
once the core abstraction is agreed upon.
---
### Why this approach
- Keeps the initial API surface small and reviewable
- Avoids coupling with existing async internals
- Makes future extensions (timers, retries, Python, SQL) explicit and
incremental
- Aligns with Flink’s philosophy of evolving APIs through small, focused
changes
---
### Follow-up work (out of scope for this PR)
Potential next steps include:
- Ordered batch async operator
- Time-based batch triggering
- Async retry and timeout strategies
- Metrics and backpressure awareness
- Python / ML inference integrations
- SQL/Table API exposure
---
### Compatibility
- Fully backward-compatible
- Purely additive changes
- No behavior changes to existing async operators
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]