featzhang opened a new pull request, #27359:
URL: https://github.com/apache/flink/pull/27359
## What is the purpose of the change
This PR adds **retry and timeout strategies** to `AsyncBatchWaitOperator` to
support AI/ML inference workloads that require:
- **Retry on transient failures** - Automatically retry failed batch async
operations with configurable backoff strategies
- **Timeout handling** - Prevent indefinite waiting for async operations
with configurable timeout behavior (fail or allow partial results)
These features are essential for production AI inference pipelines where:
- External model serving endpoints may experience transient failures
- Network issues or service degradation require graceful retry mechanisms
- Hard timeout limits are needed to prevent unbounded latency
## Brief change log
### New Classes
| Class | Description |
|-------|-------------|
| `AsyncBatchRetryStrategy<OUT>` | Interface defining retry strategy for
batch operations |
| `AsyncBatchRetryPredicate<OUT>` | Interface for predicates that determine
when to retry |
| `AsyncBatchTimeoutPolicy` | Configuration for batch-level timeout behavior
|
| `AsyncBatchRetryStrategies` | Utility class with built-in retry strategies
|
### Retry Strategies Provided
| Strategy | Description |
|----------|-------------|
| `NoRetryStrategy` | Never retries (default behavior) |
| `FixedDelayRetryStrategy` | Retries with fixed delay between attempts |
| `ExponentialBackoffDelayRetryStrategy` | Retries with exponentially
increasing delays |
### Timeout Policies Provided
| Policy | Description |
|--------|-------------|
| `NO_TIMEOUT_POLICY` | Timeout disabled (default) |
| `failOnTimeout(Duration)` | Fail the operator when timeout occurs |
| `allowPartialOnTimeout(Duration)` | Allow partial results when timeout
occurs |
### Operator Changes
- Modified `AsyncBatchWaitOperator` to support:
- Retry logic based on exception or result predicates
- Timeout tracking with processing time timers
- Metrics for retry and timeout events (`batchRetryCount`,
`batchTimeoutCount`)
### API Changes
- Added new overloads to `AsyncDataStream`:
- `unorderedWaitBatch(DataStream, AsyncBatchFunction, int, long,
AsyncBatchRetryStrategy, AsyncBatchTimeoutPolicy)` - Full configuration
- `unorderedWaitBatchWithRetry(...)` - Retry support only
- `unorderedWaitBatchWithTimeout(...)` - Timeout support only
## Example Usage
### Fixed Delay Retry
```java
// Retry up to 3 times with 100ms delay on IOException
AsyncBatchRetryStrategy<String> retryStrategy =
new AsyncBatchRetryStrategies.FixedDelayRetryStrategyBuilder<String>(3,
100L)
.ifException(e -> e instanceof IOException)
.build();
AsyncDataStream.unorderedWaitBatchWithRetry(
inputStream,
batchFunction,
maxBatchSize,
batchTimeoutMs,
retryStrategy
);
```
### Exponential Backoff Retry
```java
// Retry up to 5 times: initial 100ms, max 10s, multiplier 2.0
AsyncBatchRetryStrategy<String> retryStrategy =
new
AsyncBatchRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder<String>(
5, 100L, 10000L, 2.0)
.ifException(e -> e instanceof TimeoutException)
.build();
```
### Timeout Configuration
```java
// Fail after 5 seconds
AsyncBatchTimeoutPolicy timeoutPolicy =
AsyncBatchTimeoutPolicy.failOnTimeout(Duration.ofSeconds(5));
// Or allow partial results after 10 seconds
AsyncBatchTimeoutPolicy timeoutPolicy =
AsyncBatchTimeoutPolicy.allowPartialOnTimeout(Duration.ofSeconds(10));
AsyncDataStream.unorderedWaitBatchWithTimeout(
inputStream,
batchFunction,
maxBatchSize,
batchTimeoutMs,
timeoutPolicy
);
```
### Combined Retry and Timeout
```java
AsyncDataStream.unorderedWaitBatch(
inputStream,
batchFunction,
maxBatchSize,
batchTimeoutMs,
retryStrategy,
timeoutPolicy
);
```
## Verifying this change
This change added tests and can be verified as follows:
### Retry Tests
- `AsyncBatchRetryAndTimeoutTest#testRetryWithFixedDelay` - Verifies fixed
delay retry works correctly
- `AsyncBatchRetryAndTimeoutTest#testRetryWithExponentialBackoff` - Verifies
exponential backoff retry
- `AsyncBatchRetryAndTimeoutTest#testRetryOnResultPredicate` - Verifies
retry based on result predicate
- `AsyncBatchRetryAndTimeoutTest#testRetryExhausted` - Verifies failure
after max attempts exhausted
- `AsyncBatchRetryAndTimeoutTest#testNoRetryForNonMatchingException` -
Verifies non-matching exceptions are not retried
### Timeout Tests
- `AsyncBatchRetryAndTimeoutTest#testTimeoutWithFailBehavior` - Verifies
fail-on-timeout behavior
- `AsyncBatchRetryAndTimeoutTest#testTimeoutWithAllowPartialBehavior` -
Verifies allow-partial-on-timeout behavior
- `AsyncBatchRetryAndTimeoutTest#testCompletionBeforeTimeout` - Verifies
normal completion before timeout
### Combined Tests
- `AsyncBatchRetryAndTimeoutTest#testTimeoutCancelsRetry` - Verifies timeout
cancels pending retry
- `AsyncBatchRetryAndTimeoutTest#testRetrySucceedsBeforeTimeout` - Verifies
retry succeeds before timeout
## Does this pull request potentially affect one of the following parts
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **yes** (new `@PublicEvolving` interfaces)
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **yes**
(retry/timeout tracking in batch processing)
- Anything that affects deployment or recovery: **no**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **yes**
- If yes, how is the feature documented? **JavaDocs**
## Design Decisions
### 1. Strategy Pattern for Retry
We use the Strategy pattern to allow flexible retry configuration, following
the existing `AsyncRetryStrategy` design in Flink.
### 2. Timeout via ProcessingTimeService
Timeout is implemented using Flink's `ProcessingTimeService` timers,
ensuring integration with Flink's time handling.
### 3. Atomic State Management
The `BatchResultHandler` uses `AtomicBoolean` flags to prevent race
conditions between timeout, retry, and normal completion.
### 4. Metrics Integration
Added `batchRetryCount` and `batchTimeoutCount` counters for monitoring
retry and timeout events.
## Future Work (TODOs in code)
- Event-time based batching support
- Ordered batch operations with retry/timeout
- Circuit breaker pattern for repeated failures
- Retry with different batch splitting strategies
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]