[ 
https://issues.apache.org/jira/browse/FLINK-39074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

featzhang updated FLINK-39074:
------------------------------
    Description: 
PyFlink currently lacks native support for AI model inference, forcing users to 
manually manage models, batching, and resources. This proposal introduces a new 
DataStream.infer() API that provides out-of-the-box AI inference capabilities 
with automatic lifecycle management.
h2. 
MOTIVATION

Current Pain Points:
• Users must manually load/unload models
• No built-in batching for inference optimization
• No standardized resource management
• Lack of warmup and performance optimization strategies

User Impact:
• Complicated boilerplate code for inference
• Suboptimal performance due to lack of batching
• Resource leaks from improper model management
h2. 
PROPOSED API



Text Embedding Example:
{code:python}
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()

result = data_stream.infer(
    model="sentence-transformers/all-MiniLM-L6-v2",
    input_col="text",
    output_col="embedding"
)
{code}
Sentiment Classification Example:
{code:python}
sentiment = data_stream.infer(
    model="distilbert-base-uncased-finetuned-sst-2-english",
    input_col="text",
    output_col="sentiment",
    task_type="classification"
)
{code}
h2. 
ARCHITECTURE


{noformat}
DataStream.infer()
    ↓
InferenceFunction (MapFunction)
    ↓
ModelLifecycleManager
    ├── Model Loading (HuggingFace/Local)
    ├── Model Warmup
    └── Resource Management
    ↓
BatchInferenceExecutor
    ├── Tokenization
    ├── Batch Inference
    └── Result Extraction
    ↓
InferenceMetrics
{noformat}
h2. 
KEY FEATURES



Model Lifecycle Management:
• Automatic model loading from HuggingFace Hub or local path
• Model warmup for optimal performance
• Proper cleanup and resource deallocation

Batch Inference:
• Configurable batch size
• Batch timeout control
• Future: Integration with AsyncBatchFunction (FLINK-38825)

Multi-Task Support:
• Text embedding
• Text classification
• Text generation

Resource Optimization:
• CPU/GPU device selection
• FP16 precision support
• CUDA memory management

Metrics & Monitoring:
• Inference latency (avg/p50/p95/p99)
• Throughput tracking
• Error rate monitoring
h2. 
CONFIGURATION OPTIONS


|Parameter|Type|Default|Description|
|model|string|(required)|Model name (HuggingFace) or local path|
|input_col|string|(required)|Input column name|
|output_col|string|(required)|Output column name|
|batch_size|int|32|Batch size for inference|
|max_batch_timeout_ms|int|100|Max batch wait time in milliseconds|
|model_warmup|bool|true|Enable model warmup|
|device|string|"cpu"|Device: cpu, cuda:0, etc.|
|num_workers|int|1|Number of worker processes|
|task_type|string|"embedding"|Task type: embedding/classification/generation|

  was:
h3. Overview

PyFlink currently lacks native support for AI model inference, forcing users to 
manually manage models, batching, and resources. This proposal introduces a new 
{{DataStream.infer()}} API that provides out-of-the-box AI inference 
capabilities with automatic lifecycle management.
 
h3. Motivation

*Current Pain Points:*
1. Users must manually load/unload models
2. No built-in batching for inference optimization
3. No standardized resource management
4. Lack of warmup and performance optimization strategies

*User Impact:*
- Complicated boilerplate code for inference
- Suboptimal performance due to lack of batching
- Resource leaks from improper model management
 
h3. Proposed Solution
 
h4. 1. Simple API
 # {{from pyflink.datastream import StreamExecutionEnvironment}}
 # {{}}
 # {{env = StreamExecutionEnvironment.get_execution_environment()}}
 # {{}}
 # {{# Text embedding example}}
 # {{result = data_stream.infer(}}
 # {{    model="sentence-transformers/all-MiniLM-L6-v2",}}
 # {{    input_col="text",}}
 # {{    output_col="embedding"}}
 # {{)}}
 # {{}}
 # {{# Sentiment classification example}}
 # {{sentiment = data_stream.infer(}}
 # {{    model="distilbert-base-uncased-finetuned-sst-2-english",}}
 # {{    input_col="text",}}
 # {{    output_col="sentiment",}}
 # {{    task_type="classification"}}
 # {{)}}

 
h4. 2. Architecture
 # {{DataStream.infer()}}
 # {{    ↓}}
 # {{InferenceFunction (MapFunction)}}
 # {{    ↓}}
 # {{ModelLifecycleManager}}
 # {{    ├── Model Loading (HuggingFace/Local)}}
 # {{    ├── Model Warmup}}
 # {{    └── Resource Management}}
 # {{    ↓}}
 # {{BatchInferenceExecutor}}
 # {{    ├── Tokenization}}
 # {{    ├── Batch Inference}}
 # {{    └── Result Extraction}}
 # {{    ↓}}
 # {{InferenceMetrics}}

 
h4. 3. Key Features

*Model Lifecycle Management*
- Automatic model loading from HuggingFace Hub or local path
- Model warmup for optimal performance
- Proper cleanup and resource deallocation

*Batch Inference*
- Configurable batch size
- Batch timeout control
- Future: Integration with AsyncBatchFunction (FLINK-38825)

*Multi-Task Support*
- Text embedding
- Text classification
- Text generation

*Resource Optimization*
- CPU/GPU device selection
- FP16 precision support
- CUDA memory management

*Metrics & Monitoring*
- Inference latency (avg/p50/p95/p99)
- Throughput tracking
- Error rate monitoring
 
h4. 4. Configuration Options
||Parameter||Type||Default||Description||
|model|string|-|Model name (HuggingFace) or local path (required)|
|input_col|string|-|Input column name (required)|
|output_col|string|-|Output column name (required)|
|batch_size|int|32|Batch size for inference|
|max_batch_timeout_ms|int|100|Max batch wait time|
|model_warmup|bool|true|Enable model warmup|
|device|string|"cpu"|Device: cpu, cuda:0, etc.|
|num_workers|int|1|Number of worker processes|
|task_type|string|"embedding"|Task type: embedding/classification/generation|
 
h3. Implementation Status

*✅ Completed (Phase 1 - MVP)*

Python Modules:
- {{pyflink.ml.inference.config}} - Configuration management
- {{pyflink.ml.inference.lifecycle}} - Model lifecycle management
- {{pyflink.ml.inference.executor}} - Batch inference execution
- {{pyflink.ml.inference.function}} - MapFunction implementation
- {{pyflink.ml.inference.metrics}} - Metrics collection
- {{pyflink.datastream.data_stream.infer()}} - Public API

Unit Tests:
- Configuration validation
- Metrics collection
- Mock-based inference tests

*⏳ In Progress / Planned*

Phase 2:
- [ ] Java-side InferenceOperator for better integration
- [ ] AsyncBatchFunction integration (depends on FLINK-38825)
- [ ] Python Worker pool management

Phase 3:
- [ ] ONNX Runtime support
- [ ] TensorFlow model support
- [ ] Model quantization
- [ ] Multi-model pipeline
 
h3. Dependencies

*Required Python Packages:*
  # {{torch>=2.0.0}}
 # {{transformers>=4.30.0}}
 # {{numpy>=1.21.0}}

*Optional:*
  # {{cuda-python>=11.0  # For GPU support}}
 # {{onnxruntime>=1.14.0  # For ONNX models}}

 
h3. Code Statistics
  # {{Module                  Lines}}
 # {{--------------------------------}}
 # {{config.py               90}}
 # {{lifecycle.py            188}}
 # {{executor.py             228}}
 # {{function.py             171}}
 # {{metrics.py              96}}
 # {{__init__.py             50}}
 # {{tests/test_inference.py 278}}
 # {{--------------------------------}}
 # {{Total                   1,101 lines}}

 
h3. Testing

*Unit Tests:* ✅ 11 tests passing
- Configuration validation
- Metrics calculation
- Mock inference logic

*Integration Tests:* ⏳ Planned
- End-to-end inference with real models
- GPU inference tests
- Performance benchmarks
 
h3. Performance Expectations ?
||Scenario||Current (manual)||With .infer()||Improvement||
|CPU (batch=1)|10 rec/s|100 rec/s|*10x*|
|CPU (batch=32)|50 rec/s|500 rec/s|*10x*|
|GPU (batch=64)|200 rec/s|2000 rec/s|*10x*|
 
h3. Known Limitations (Phase 1)
 # *Batching:* Currently processes records one-by-one via MapFunction. True 
batching requires AsyncBatchFunction integration (FLINK-38825).

 # *Java Integration:* Pure Python implementation. Java-side operator not 
implemented yet.

 # *Model Support:* Currently HuggingFace Transformers only. ONNX/TensorFlow 
support planned.

 
h3. Documentation

*Planned Documentation:*
- User guide with examples
- API reference
- Best practices guide
- Performance tuning guide
 
h3. Risk Assessment

*Technical Risks:*
- Python/Java serialization overhead
- Memory management complexity
- GPU resource contention

*Mitigation:*
- Use Arrow format for efficient serialization
- Implement memory monitoring
- Provide GPU isolation options
 
h3. References
 * [HuggingFace Transformers|https://huggingface.co/docs/transformers]
 * [PyTorch Inference 
Guide|https://pytorch.org/tutorials/recipes/recipes/tuning_guide.html]
 * Feature Design Doc


> Add built-in AI model inference capability to PyFlink with automatic 
> lifecycle management
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-39074
>                 URL: https://issues.apache.org/jira/browse/FLINK-39074
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / Python
>            Reporter: featzhang
>            Priority: Major
>
> PyFlink currently lacks native support for AI model inference, forcing users 
> to manually manage models, batching, and resources. This proposal introduces 
> a new DataStream.infer() API that provides out-of-the-box AI inference 
> capabilities with automatic lifecycle management.
> h2. 
> MOTIVATION
> Current Pain Points:
> • Users must manually load/unload models
> • No built-in batching for inference optimization
> • No standardized resource management
> • Lack of warmup and performance optimization strategies
> User Impact:
> • Complicated boilerplate code for inference
> • Suboptimal performance due to lack of batching
> • Resource leaks from improper model management
> h2. 
> PROPOSED API
> Text Embedding Example:
> {code:python}
> from pyflink.datastream import StreamExecutionEnvironment
> env = StreamExecutionEnvironment.get_execution_environment()
> result = data_stream.infer(
>     model="sentence-transformers/all-MiniLM-L6-v2",
>     input_col="text",
>     output_col="embedding"
> )
> {code}
> Sentiment Classification Example:
> {code:python}
> sentiment = data_stream.infer(
>     model="distilbert-base-uncased-finetuned-sst-2-english",
>     input_col="text",
>     output_col="sentiment",
>     task_type="classification"
> )
> {code}
> h2. 
> ARCHITECTURE
> {noformat}
> DataStream.infer()
>     ↓
> InferenceFunction (MapFunction)
>     ↓
> ModelLifecycleManager
>     ├── Model Loading (HuggingFace/Local)
>     ├── Model Warmup
>     └── Resource Management
>     ↓
> BatchInferenceExecutor
>     ├── Tokenization
>     ├── Batch Inference
>     └── Result Extraction
>     ↓
> InferenceMetrics
> {noformat}
> h2. 
> KEY FEATURES
> Model Lifecycle Management:
> • Automatic model loading from HuggingFace Hub or local path
> • Model warmup for optimal performance
> • Proper cleanup and resource deallocation
> Batch Inference:
> • Configurable batch size
> • Batch timeout control
> • Future: Integration with AsyncBatchFunction (FLINK-38825)
> Multi-Task Support:
> • Text embedding
> • Text classification
> • Text generation
> Resource Optimization:
> • CPU/GPU device selection
> • FP16 precision support
> • CUDA memory management
> Metrics & Monitoring:
> • Inference latency (avg/p50/p95/p99)
> • Throughput tracking
> • Error rate monitoring
> h2. 
> CONFIGURATION OPTIONS
> |Parameter|Type|Default|Description|
> |model|string|(required)|Model name (HuggingFace) or local path|
> |input_col|string|(required)|Input column name|
> |output_col|string|(required)|Output column name|
> |batch_size|int|32|Batch size for inference|
> |max_batch_timeout_ms|int|100|Max batch wait time in milliseconds|
> |model_warmup|bool|true|Enable model warmup|
> |device|string|"cpu"|Device: cpu, cuda:0, etc.|
> |num_workers|int|1|Number of worker processes|
> |task_type|string|"embedding"|Task type: embedding/classification/generation|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to