kaori-seasons opened a new issue, #6762:
URL: https://github.com/apache/paimon/issues/6762

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   ## I. Overview
   
   ### 1.1 Existing Architecture
   
   Apache Paimon has already implemented a comprehensive Python API that 
supports multiple data format outputs:
   
   Current supported output formats include:
   - **Arrow Format**: `to_arrow()`, `to_arrow_batch_reader()` - provides 
stream-based batch processing
   - **Pandas**: `to_pandas()` - converts to DataFrame
   - **Ray**: `to_ray()` - distributed computing integration
   - **DuckDB**: `to_duckdb()` - SQL analytics
   - **Iterator**: `to_iterator()` - native Python row-level iteration
   
   ### 1.2 Core Capabilities
   -  **Sharded Reading**: Split-based data sharding mechanism
   -  **Predicate Pushdown**: Filtering condition pushdown optimization
   -  **Column Pruning**: Projection pushdown reduces data transfer
   -  **Streaming Processing**: Arrow RecordBatch streaming reads
   -  **Incremental Reading**: Supports timestamp range incremental queries
   
   ### 1.3 Technical Gap
   After code search confirmation, there is currently **no** PyTorch or 
TensorFlow integration code. Implementation is needed from scratch.
   
   ## II. Technical Solution Design
   
   ### 2.1 PyTorch Dataset Interface Implementation Plan
   
   #### 2.1.1 Architecture Design
   
   ```
   ┌─────────────────────────────────────────────────────────────┐
   │                   PyTorch Training Loop                      │
   └─────────────────────────────────────────────────────────────┘
                                 │
                                 ▼
   ┌─────────────────────────────────────────────────────────────┐
   │                  PaimonIterableDataset                       │
   │  - Inherits torch.utils.data.IterableDataset                │
   │  - Supports worker sharding                                  │
   │  - Provides __iter__ method                                  │
   └─────────────────────────────────────────────────────────────┘
                                 │
                                 ▼
   ┌─────────────────────────────────────────────────────────────┐
   │                     PaimonMapDataset                         │
   │  - Inherits torch.utils.data.Dataset                         │
   │  - Supports random access via __getitem__                    │
   │  - Caches splits metadata                                    │
   └─────────────────────────────────────────────────────────────┘
                                 │
                                 ▼
   ┌─────────────────────────────────────────────────────────────┐
   │              TableRead (Existing Paimon API)                 │
   │  - to_arrow_batch_reader() for streaming                    │
   │  - to_arrow() for full table                                │
   │  - Split-based parallel reading                             │
   └─────────────────────────────────────────────────────────────┘
   ```
   
   #### 2.1.2 Implementation Details
   
   **File Structure**:
   ```
   paimon-python/pypaimon/ml/
   ├── __init__.py
   ├── pytorch/
   │   ├── __init__.py
   │   ├── dataset.py          # PaimonIterableDataset, PaimonMapDataset
   │   ├── transforms.py       # Data transformation utilities
   │   └── collate.py          # Batch collation functions
   └── tensorflow/
       ├── __init__.py
       └── dataset.py          # TensorFlow Dataset implementation
   ```
   
   **Core Class Design - PaimonIterableDataset**:
   
   Based on the existing `to_arrow_batch_reader()` method, implements streaming 
iteration:
   
   ```python
   class PaimonIterableDataset(torch.utils.data.IterableDataset):
       """
       Streaming dataset, suitable for large-scale data training
       - Supports PyTorch DataLoader multiprocessing
       - Automatically shards to different workers
       - Memory-efficient streaming reads
       """
   ```
   
   **Core Class Design - PaimonMapDataset**:
   
   Based on the `to_arrow()` method, implements random access:
   
   ```python
   class PaimonMapDataset(torch.utils.data.Dataset):
       """
       Map-style dataset, supports random access
       - Implements __len__ and __getitem__
       - Suitable for scenarios requiring random sampling
       - Can be used with PyTorch Sampler
       """
   ```
   
   #### 2.1.3 Key Technical Points
   
   **1. Worker Sharding Support**
   
   Leveraging Paimon's Shard Read functionality, automatically allocates in 
`__iter__`:
   
   ```python
   def __iter__(self):
       worker_info = torch.utils.data.get_worker_info()
       if worker_info is None:
           # Single process
           splits = self.scan.plan().splits()
       else:
           # Multi-process: uses with_shard for automatic sharding
           shard_index = worker_info.id
           total_shards = worker_info.num_workers
           splits = self.scan.with_shard(shard_index, 
total_shards).plan().splits()
   ```
   
   **2. Data Transformation Pipeline**
   
   Supports flexible data transformation:
   ```python
   class PaimonDatasetConfig:
       - transform: Optional[Callable]  # Custom transformation function
       - target_column: Optional[str]   # Label column
       - feature_columns: List[str]     # Feature columns
       - tensor_type: torch.dtype       # Output tensor type
   ```
   
   **3. Batch Processing Optimization**
   
   Leverages Arrow's efficient batch processing to reduce memory copies.
   
   ### 2.2 TensorFlow Dataset Interface Implementation Plan
   
   #### 2.2.1 Architecture Design
   
   ```
   ┌─────────────────────────────────────────────────────────────┐
   │                 TensorFlow Training Loop                     │
   └─────────────────────────────────────────────────────────────┘
                                 │
                                 ▼
   ┌─────────────────────────────────────────────────────────────┐
   │                PaimonTensorFlowDataset                       │
   │  - Uses tf.data.Dataset.from_generator()                    │
   │  - Supports prefetching and parallelism                     │
   │  - TFRecord-compatible output                               │
   └─────────────────────────────────────────────────────────────┘
                                 │
                                 ▼
   ┌─────────────────────────────────────────────────────────────┐
   │              TableRead (Existing Paimon API)                 │
   │  - to_arrow_batch_reader() for streaming                    │
   │  - Automatic type conversion to TF tensors                  │
   └─────────────────────────────────────────────────────────────┘
   ```
   
   #### 2.2.2 Implementation Strategy
   
   **Core Method**: Using `tf.data.Dataset.from_generator()`
   
   ```python
   class PaimonTensorFlowDataset:
       """
       TensorFlow Dataset wrapper
       - Based on from_generator implementation
       - Supports all tf.data API features
       - Automatic type inference and conversion
       """
       
       @staticmethod
       def from_paimon(table, read_builder, splits):
           def generator():
               # Use to_arrow_batch_reader for streaming generation
               batch_reader = table_read.to_arrow_batch_reader(splits)
               for batch in batch_reader:
                   # Convert to TF compatible format
                   yield convert_arrow_to_tf(batch)
           
           # Create tf.data.Dataset
           return tf.data.Dataset.from_generator(
               generator,
               output_signature=infer_tf_signature(read_builder.read_type())
           )
   ```
   
   #### 2.2.3 Key Technical Points
   
   **1. Type Mapping**
   
   Establishes mapping from Paimon data types to TensorFlow types:
   
   ```
   Paimon Type  →  TF Type
   INT/BIGINT   →  tf.int32/tf.int64
   FLOAT/DOUBLE →  tf.float32/tf.float64
   STRING       →  tf.string
   BOOLEAN      →  tf.bool
   TIMESTAMP    →  tf.int64 (milliseconds)
   ```
   
   **2. Performance Optimization**
   
   - Uses `.prefetch()` to pre-fetch data
   - Uses `.batch()` for dynamic batch processing
   - Uses `.map()` for parallel data transformation
   - Supports `tf.distribute.Strategy` for distributed training
   
   ## III. Implementation Plan
   
   ### 3.1 Phase One: PyTorch Dataset (High Priority)
   
   **Phase 1.1: Basic Implementation (2 weeks)**
   - [ ] Create `pypaimon/ml/pytorch/` module structure
   - [ ] Implement `PaimonIterableDataset` base class
   - [ ] Implement worker sharding logic
   - [ ] Add basic type conversion (Arrow → PyTorch Tensor)
   - [ ] Write unit tests
   
   **Phase 1.2: Feature Enhancement (1 week)**
   - [ ] Implement `PaimonMapDataset` for random access support
   - [ ] Add data transformation pipeline (`transforms.py`)
   - [ ] Implement custom `collate_fn`
   - [ ] Support common ML scenarios (classification, regression, sequence)
   
   **Phase 1.3: Testing and Documentation (1 week)**
   - [ ] End-to-end integration tests
   - [ ] Performance benchmark tests
   - [ ] Write user documentation and example code
   - [ ] Add to official documentation
   
   ### 3.2 Phase Two: TensorFlow Dataset (2 weeks)
   
   **Phase 2.1: Core Implementation**
   - [ ] Create `pypaimon/ml/tensorflow/` module
   - [ ] Implement `PaimonTensorFlowDataset` class
   - [ ] Establish type mapping system
   - [ ] Implement streaming generator
   
   **Phase 2.2: Optimization and Testing**
   - [ ] Performance tuning (prefetch, cache, parallel map)
   - [ ] Support `tf.distribute.Strategy`
   - [ ] Unit and integration tests
   - [ ] Documentation and examples
   
   ### 3.3 Phase Three: Production Optimization (1-2 weeks)
   
   **Performance Optimization**:
   - [ ] Zero-copy conversion (Arrow → Tensor)
   - [ ] Memory pool management
   - [ ] Adaptive batch sizing
   
   **Feature Completeness**:
   - [ ] Support online feature engineering
   - [ ] Add caching mechanism
   - [ ] Support data augmentation
   
   **Monitoring and Diagnostics**:
   - [ ] Add performance metrics
   - [ ] Data sampling preview
   - [ ] Error handling and retry logic
   
   ## IV. Dependency Management
   
   ### 4.1 Update requirements.txt
   
   Add optional dependencies:
   ```python
   # ML/AI framework support (optional)
   torch>=1.9.0; extra == "pytorch"
   tensorflow>=2.8.0; extra == "tensorflow"
   ```
   
   ### 4.2 Update setup.py
   
   ```python
   extras_require={
       'pytorch': ['torch>=1.9.0'],
       'tensorflow': ['tensorflow>=2.8.0'],
       'ml': ['torch>=1.9.0', 'tensorflow>=2.8.0'],
   }
   ```
   
   ## V. API Usage Examples
   
   ### 5.1 PyTorch Usage Example
   
   ```python
   from pypaimon import CatalogFactory
   from pypaimon.ml.pytorch import PaimonIterableDataset
   import torch
   from torch.utils.data import DataLoader
   
   # 1. Create Catalog and get table
   catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'})
   table = catalog.get_table('default.training_data')
   
   # 2. Configure reading
   read_builder = table.new_read_builder() \
       .with_projection(['feature1', 'feature2', 'label']) \
       .with_filter(predicate_builder.greater_than('timestamp', '2024-01-01'))
   
   # 3. Create PyTorch Dataset
   dataset = PaimonIterableDataset(
       read_builder=read_builder,
       feature_columns=['feature1', 'feature2'],
       target_column='label',
       transform=lambda x: torch.tensor(x, dtype=torch.float32)
   )
   
   # 4. Create DataLoader (automatically shards to multiple workers)
   dataloader = DataLoader(
       dataset,
       batch_size=32,
       num_workers=4  # Automatically uses Paimon shard read
   )
   
   # 5. Training loop
   for epoch in range(10):
       for batch in dataloader:
           features, labels = batch
           # Train model...
   ```
   
   ### 5.2 TensorFlow Usage Example
   
   ```python
   from pypaimon import CatalogFactory
   from pypaimon.ml.tensorflow import PaimonTensorFlowDataset
   import tensorflow as tf
   
   # 1. Get table and configure reading
   catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'})
   table = catalog.get_table('default.training_data')
   read_builder = table.new_read_builder()
   
   # 2. Create TensorFlow Dataset
   tf_dataset = PaimonTensorFlowDataset.from_paimon(
       table=table,
       read_builder=read_builder,
       feature_columns=['feature1', 'feature2'],
       label_column='label'
   )
   
   # 3. Apply TF transformations
   tf_dataset = tf_dataset \
       .batch(32) \
       .prefetch(tf.data.AUTOTUNE) \
       .cache()
   
   # 4. Train model
   model = create_model()
   model.fit(tf_dataset, epochs=10)
   ```
   
   ## VI. Quality Assurance
   
   ### 6.1 Testing Strategy
   
   **Unit Tests**: Based on existing test framework
   
   ```
   tests/ml/
   ├── pytorch/
   │   ├── test_iterable_dataset.py
   │   ├── test_map_dataset.py
   │   ├── test_transforms.py
   │   └── test_integration.py
   └── tensorflow/
       ├── test_tf_dataset.py
       └── test_integration.py
   ```
   
   
   
   ### Solution
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to