featzhang opened a new pull request, #27361:
URL: https://github.com/apache/flink/pull/27361

   ## What is the purpose of the change
   
   This PR adds **Python DataStream API integration** for the existing Java 
`AsyncBatchWaitOperator` runtime capability, enabling Python-based AI/ML 
inference and external service calls to use **batch-oriented async execution**.
   
   This is a **pure integration PR** - all batching, scheduling, and async 
execution logic is reused from the Java side.
   
   ## Brief change log
   
   ### New Python Classes
   
   | File | Description |
   |------|-------------|
   | `AsyncBatchFunction` | Python async batch function interface |
   | `AsyncBatchFunctionDescriptor` | Descriptor for serialization and 
configuration |
   | `AsyncBatchOperation` | Runtime operation for batch async execution |
   | `BatchResultDistributor` | Distributes batch results to individual 
elements |
   
   ### Modified Files
   
   | File | Changes |
   |------|---------|
   | `async_data_stream.py` | Added `unordered_wait_batch()` and 
`ordered_wait_batch()` methods |
   | `functions.py` | Added `AsyncBatchFunction` and 
`AsyncBatchFunctionDescriptor` classes |
   | `__init__.py` | Exported `AsyncBatchFunction` |
   | `flink-fn-execution.proto` | Added `ASYNC_BATCH` function type |
   
   ### Test Files
   
   | File | Description |
   |------|-------------|
   | `test_async_batch_function.py` | Comprehensive tests for batch async 
functionality |
   
   ## API Design
   
   ### AsyncBatchFunction
   
   ```python
   class AsyncBatchFunction(Function, Generic[IN, OUT]):
       """
       A function to trigger Async I/O operation with batch processing support.
       Designed for AI/ML inference scenarios where batching improves 
throughput.
       """
   
       @abstractmethod
       async def async_invoke_batch(self, inputs: List[IN]) -> List[OUT]:
           """
           Trigger async operation for a batch of stream inputs.
           Returns a list of results, one for each input element.
           """
           pass
   
       def timeout_batch(self, inputs: List[IN]) -> List[OUT]:
           """
           Called when async_invoke_batch times out.
           Override to provide custom timeout handling.
           """
           raise TimeoutError("Async batch function call has timed out")
   ```
   
   ### AsyncDataStream Methods
   
   ```python
   # Unordered batch execution
   AsyncDataStream.unordered_wait_batch(
       data_stream,
       async_batch_function,
       timeout,           # Overall timeout
       batch_size,        # Max elements per batch
       batch_timeout=None,# Optional batch flush timeout
       capacity=100,      # Max in-flight operations
       output_type=None   # Output type info
   )
   
   # Ordered batch execution (preserves input order)
   AsyncDataStream.ordered_wait_batch(
       data_stream,
       async_batch_function,
       timeout,
       batch_size,
       batch_timeout=None,
       capacity=100,
       output_type=None
   )
   ```
   
   ## Example Usage
   
   ```python
   from pyflink.datastream import AsyncDataStream, AsyncBatchFunction
   from pyflink.common import Time, Types, Row
   from typing import List
   
   class MLInferenceFunction(AsyncBatchFunction):
       """Batch ML model inference function."""
       
       async def async_invoke_batch(self, inputs: List[Row]) -> List[float]:
           # Batch inference call to ML model
           features = [self.extract_features(row) for row in inputs]
           predictions = await self.model.predict_batch(features)
           return predictions.tolist()
   
   # Apply to data stream
   result = AsyncDataStream.unordered_wait_batch(
       ds,
       MLInferenceFunction(),
       timeout=Time.seconds(30),
       batch_size=32,
       batch_timeout=Time.milliseconds(100),
       output_type=Types.FLOAT()
   )
   ```
   
   ## Testing
   
   The PR includes comprehensive tests covering:
   
   1. **Basic batch execution** - Verify batching and results
   2. **Batch size triggering** - Verify batches trigger at configured size
   3. **Batch timeout triggering** - Verify partial batches flush on timeout
   4. **Exception propagation** - Verify errors fail the job
   5. **Timeout handling** - Verify `timeout_batch` is called
   6. **Ordered execution** - Verify output order matches input
   7. **End-of-input flush** - Verify remaining elements are flushed
   8. **Validation errors** - Verify parameter validation
   
   ## Design Principles
   
   1. **Reuse Java Runtime** - All batching logic is in `AsyncBatchWaitOperator`
   2. **Follow Existing Patterns** - API mirrors `AsyncFunction` integration
   3. **Explicit, Readable Code** - No complex abstractions
   4. **Backward Compatible** - Existing APIs unchanged
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   ```bash
   cd flink-python
   python -m pytest pyflink/datastream/tests/test_async_batch_function.py -v
   ```
   
   ## Does this pull request potentially affect one of the following parts
   
   - Dependencies (does it add or upgrade a dependency): **no**
   - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **yes**
   - The serializers: **no**
   - The runtime per-record code paths (performance sensitive): **no**
   - Anything that affects deployment or recovery: **no**
   - The S3 file system connector: **no**
   
   ## Documentation
   
   - Does this pull request introduce a new feature? **yes**
   - If yes, how is the feature documented? **Docstrings**
   
   ---
   
   ## PR Series for FLINK-38825
   
   JIRA: [FLINK-38825](https://issues.apache.org/jira/browse/FLINK-38825) - 
Introduce an AI-friendly Async Batch Operator for high-latency inference 
workloads
   
   This feature is implemented incrementally through the following PR series:
   
   | # | PR | Title | Description | Module |
   |---|---|---|---|---|
   | 1 | [#27355](https://github.com/apache/flink/pull/27355) | Introduce 
AsyncBatchFunction and AsyncBatchWaitOperator | Core API and runtime operator 
with unordered semantics and size-based batch triggering | 
`flink-streaming-java` |
   | 2 | [#27356](https://github.com/apache/flink/pull/27356) | Add time-based 
batch triggering | Timeout-based flush to trigger incomplete batches after a 
configurable duration | `flink-streaming-java` |
   | 3 | [#27357](https://github.com/apache/flink/pull/27357) | Add ordered 
async batch support | Ordered output semantics via 
OrderedAsyncBatchWaitOperator with sequence numbers | `flink-streaming-java` |
   | 4 | [#27358](https://github.com/apache/flink/pull/27358) | Add 
inference-oriented metrics | Batch size/latency histograms, async call 
duration, inflight batches, failure counters | `flink-streaming-java` |
   | 5 | [#27359](https://github.com/apache/flink/pull/27359) | Implement retry 
and timeout strategies | Fixed-delay/exponential-backoff retry, 
fail-on-timeout/allow-partial timeout policies | `flink-streaming-java` |
   | 6 | [#27360](https://github.com/apache/flink/pull/27360) | Add SQL/Table 
API integration | AsyncBatchLookupFunction, AsyncBatchLookupFunctionProvider, 
lookup join runner | `flink-table` |
   | **7** | [#27361](https://github.com/apache/flink/pull/27361) | Add Python 
DataStream API integration | Python AsyncBatchFunction, async_invoke_batch() 
API, AsyncDataStream batch methods | **`flink-python`** |
   
   > **Note**: Each PR builds incrementally on the previous ones. PRs should be 
reviewed and merged in order.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to