featzhang opened a new pull request, #27360:
URL: https://github.com/apache/flink/pull/27360
## What is the purpose of the change
This PR adds **SQL/Table API integration** for the existing
`AsyncBatchWaitOperator` runtime capability, enabling batch-oriented async
lookup joins for AI/ML inference scenarios.
Building on previous PRs that introduced:
- `AsyncBatchFunction` and `AsyncBatchWaitOperator` (size/time-based
batching)
- Retry and timeout strategies
- Comprehensive metrics
This PR bridges the gap between the streaming runtime and the SQL/Table API
layer.
## Brief change log
### New API Classes (flink-table-common)
| File | Description |
|------|-------------|
| `AsyncBatchLookupFunction.java` | New lookup function interface for batch
async operations |
| `AsyncBatchLookupFunctionProvider.java` | Provider for creating batch
lookup functions with configuration |
### New Runtime Classes (flink-table-runtime)
| File | Description |
|------|-------------|
| `AsyncBatchLookupJoinRunner.java` | Batch-oriented async lookup join
runner |
| `AsyncBatchLookupJoinFunctionAdapter.java` | Adapter bridging Table API to
streaming AsyncBatchFunction |
### Modified Classes
| File | Description |
|------|-------------|
| `FunctionKind.java` | Added `ASYNC_BATCH_TABLE` enum value |
| `LookupJoinUtil.java` | Added batch async lookup detection and options
extraction |
## API Design
### AsyncBatchLookupFunction
```java
@PublicEvolving
public abstract class AsyncBatchLookupFunction extends
AsyncTableFunction<RowData> {
// Primary batch lookup method
public abstract CompletableFuture<Collection<RowData>>
asyncLookupBatch(List<RowData> keyRows);
// Single-key lookup delegates to batch
public CompletableFuture<Collection<RowData>> asyncLookup(RowData
keyRow);
}
```
### AsyncBatchLookupFunctionProvider
```java
@PublicEvolving
public interface AsyncBatchLookupFunctionProvider extends
LookupTableSource.LookupRuntimeProvider {
// Factory methods with configuration
static AsyncBatchLookupFunctionProvider of(AsyncBatchLookupFunction
func);
static AsyncBatchLookupFunctionProvider of(AsyncBatchLookupFunction
func, int maxBatchSize);
static AsyncBatchLookupFunctionProvider of(AsyncBatchLookupFunction
func, int maxBatchSize, Duration timeout);
// Configuration getters
AsyncBatchLookupFunction createAsyncBatchLookupFunction();
int getMaxBatchSize();
Duration getBatchTimeout();
}
```
## Example Usage
### Implementing a Batch Lookup Source
```java
public class MyBatchLookupTableSource implements LookupTableSource {
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
return AsyncBatchLookupFunctionProvider.of(
new MyBatchInferenceFunction(),
32, // maxBatchSize
Duration.ofMillis(100) // batchTimeout
);
}
}
public class MyBatchInferenceFunction extends AsyncBatchLookupFunction {
@Override
public CompletableFuture<Collection<RowData>>
asyncLookupBatch(List<RowData> keyRows) {
return CompletableFuture.supplyAsync(() -> {
// Batch ML inference call
List<float[]> features = keyRows.stream()
.map(this::extractFeatures)
.collect(Collectors.toList());
List<float[]> predictions = modelService.batchPredict(features);
return IntStream.range(0, keyRows.size())
.mapToObj(i -> createResultRow(keyRows.get(i),
predictions.get(i)))
.collect(Collectors.toList());
});
}
}
```
### Using in SQL
```sql
-- Create a lookup table with batch async support
CREATE TABLE model_predictions (
feature_id INT,
prediction FLOAT
) WITH (
'connector' = 'my-batch-inference-connector',
'async.batch.size' = '32',
'async.batch.timeout' = '100ms'
);
-- Temporal join using batch async lookup
SELECT o.*, p.prediction
FROM orders AS o
JOIN model_predictions FOR SYSTEM_TIME AS OF o.proctime AS p
ON o.feature_id = p.feature_id;
```
## Design Principles
1. **Backward Compatible** - Does not modify existing `AsyncLookupFunction`
or `AsyncWaitOperator`
2. **Reuses Existing Runtime** - Bridges to `AsyncBatchWaitOperator` for
execution
3. **SQL Layer Only Describes** - SQL describes "what", runtime decides "how"
4. **Extensible** - Clear entry points for future Python/ML Connector support
## Verifying this change
This change added tests and can be verified as follows:
### Unit Tests (flink-table-common)
- `AsyncBatchLookupFunctionTest` - Tests batch lookup semantics and
delegation
- `AsyncBatchLookupFunctionProviderTest` - Tests provider configuration and
validation
## Does this pull request potentially affect one of the following parts
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **yes**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no** (new
code path only)
- Anything that affects deployment or recovery: **no**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **yes**
- If yes, how is the feature documented? **JavaDocs**
## Future Work (TODOs in code)
- Planner integration for automatic batch async lookup detection
- SQL hint support for batch async parameters
- Event-time based batching
- Python API integration
- ML Connector integration
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]