featzhang opened a new pull request, #27360:
URL: https://github.com/apache/flink/pull/27360

   ## What is the purpose of the change
   
   This PR adds **SQL/Table API integration** for the existing 
`AsyncBatchWaitOperator` runtime capability, enabling batch-oriented async 
lookup joins for AI/ML inference scenarios.
   
   Building on previous PRs that introduced:
   - `AsyncBatchFunction` and `AsyncBatchWaitOperator` (size/time-based 
batching)
   - Retry and timeout strategies
   - Comprehensive metrics
   
   This PR bridges the gap between the streaming runtime and the SQL/Table API 
layer.
   
   ## Brief change log
   
   ### New API Classes (flink-table-common)
   
   | File | Description |
   |------|-------------|
   | `AsyncBatchLookupFunction.java` | New lookup function interface for batch 
async operations |
   | `AsyncBatchLookupFunctionProvider.java` | Provider for creating batch 
lookup functions with configuration |
   
   ### New Runtime Classes (flink-table-runtime)
   
   | File | Description |
   |------|-------------|
   | `AsyncBatchLookupJoinRunner.java` | Batch-oriented async lookup join 
runner |
   | `AsyncBatchLookupJoinFunctionAdapter.java` | Adapter bridging Table API to 
streaming AsyncBatchFunction |
   
   ### Modified Classes
   
   | File | Description |
   |------|-------------|
   | `FunctionKind.java` | Added `ASYNC_BATCH_TABLE` enum value |
   | `LookupJoinUtil.java` | Added batch async lookup detection and options 
extraction |
   
   ## API Design
   
   ### AsyncBatchLookupFunction
   
   ```java
   @PublicEvolving
   public abstract class AsyncBatchLookupFunction extends 
AsyncTableFunction<RowData> {
       
       // Primary batch lookup method
       public abstract CompletableFuture<Collection<RowData>> 
asyncLookupBatch(List<RowData> keyRows);
       
       // Single-key lookup delegates to batch
       public CompletableFuture<Collection<RowData>> asyncLookup(RowData 
keyRow);
   }
   ```
   
   ### AsyncBatchLookupFunctionProvider
   
   ```java
   @PublicEvolving
   public interface AsyncBatchLookupFunctionProvider extends 
LookupTableSource.LookupRuntimeProvider {
       
       // Factory methods with configuration
       static AsyncBatchLookupFunctionProvider of(AsyncBatchLookupFunction 
func);
       static AsyncBatchLookupFunctionProvider of(AsyncBatchLookupFunction 
func, int maxBatchSize);
       static AsyncBatchLookupFunctionProvider of(AsyncBatchLookupFunction 
func, int maxBatchSize, Duration timeout);
       
       // Configuration getters
       AsyncBatchLookupFunction createAsyncBatchLookupFunction();
       int getMaxBatchSize();
       Duration getBatchTimeout();
   }
   ```
   
   ## Example Usage
   
   ### Implementing a Batch Lookup Source
   
   ```java
   public class MyBatchLookupTableSource implements LookupTableSource {
   
       @Override
       public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
           return AsyncBatchLookupFunctionProvider.of(
               new MyBatchInferenceFunction(),
               32,  // maxBatchSize
               Duration.ofMillis(100)  // batchTimeout
           );
       }
   }
   
   public class MyBatchInferenceFunction extends AsyncBatchLookupFunction {
   
       @Override
       public CompletableFuture<Collection<RowData>> 
asyncLookupBatch(List<RowData> keyRows) {
           return CompletableFuture.supplyAsync(() -> {
               // Batch ML inference call
               List<float[]> features = keyRows.stream()
                   .map(this::extractFeatures)
                   .collect(Collectors.toList());
               
               List<float[]> predictions = modelService.batchPredict(features);
               
               return IntStream.range(0, keyRows.size())
                   .mapToObj(i -> createResultRow(keyRows.get(i), 
predictions.get(i)))
                   .collect(Collectors.toList());
           });
       }
   }
   ```
   
   ### Using in SQL
   
   ```sql
   -- Create a lookup table with batch async support
   CREATE TABLE model_predictions (
       feature_id INT,
       prediction FLOAT
   ) WITH (
       'connector' = 'my-batch-inference-connector',
       'async.batch.size' = '32',
       'async.batch.timeout' = '100ms'
   );
   
   -- Temporal join using batch async lookup
   SELECT o.*, p.prediction
   FROM orders AS o
   JOIN model_predictions FOR SYSTEM_TIME AS OF o.proctime AS p
   ON o.feature_id = p.feature_id;
   ```
   
   ## Design Principles
   
   1. **Backward Compatible** - Does not modify existing `AsyncLookupFunction` 
or `AsyncWaitOperator`
   2. **Reuses Existing Runtime** - Bridges to `AsyncBatchWaitOperator` for 
execution
   3. **SQL Layer Only Describes** - SQL describes "what", runtime decides "how"
   4. **Extensible** - Clear entry points for future Python/ML Connector support
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   ### Unit Tests (flink-table-common)
   - `AsyncBatchLookupFunctionTest` - Tests batch lookup semantics and 
delegation
   - `AsyncBatchLookupFunctionProviderTest` - Tests provider configuration and 
validation
   
   ## Does this pull request potentially affect one of the following parts
   
   - Dependencies (does it add or upgrade a dependency): **no**
   - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **yes**
   - The serializers: **no**
   - The runtime per-record code paths (performance sensitive): **no** (new 
code path only)
   - Anything that affects deployment or recovery: **no**
   - The S3 file system connector: **no**
   
   ## Documentation
   
   - Does this pull request introduce a new feature? **yes**
   - If yes, how is the feature documented? **JavaDocs**
   
   
   ## Future Work (TODOs in code)
   
   - Planner integration for automatic batch async lookup detection
   - SQL hint support for batch async parameters
   - Event-time based batching
   - Python API integration
   - ML Connector integration


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to