kaori-seasons opened a new issue, #6735:
URL: https://github.com/apache/paimon/issues/6735

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   ## I. Overview
   
   ### 1.1 Problem Description
   
   The PyPaimon `KeyValueDataWriter` was hardcoding all rows' `_VALUE_KIND` to 
0 (INSERT), losing information about different row operation types. This 
prevented proper handling of CDC (Change Data Capture) scenarios with UPDATE 
and DELETE operations.
   
   **Example Scenario:**
   - Write operation 1: User 'Alice' with RowKind=INSERT
   - Write operation 2: User 'Alice' updated to 'Alice_updated' with 
RowKind=UPDATE_AFTER
   - Write operation 3: User 'Alice' deleted with RowKind=DELETE
   - **Problem**: All three records stored with _VALUE_KIND=0 (INSERT)
   - **Expected**: Each record should have correct _VALUE_KIND (0, 2, 3)
   
   ### 1.2 Root Cause Analysis
   
   **Write-side Issue**: The `_add_system_fields()` method in 
`KeyValueDataWriter` hardcoded `_VALUE_KIND` values:
   
   ```python
   # Before: Always INSERT
   value_kind_column = pa.array([0] * num_rows, type=pa.int32())
   ```
   
   **Impact Areas:**
   - **CDC scenarios**: Cannot distinguish UPDATE_BEFORE/UPDATE_AFTER operations
   - **Delete handling**: Cannot mark DELETE rows (RowKind=3)
   - **Read-side consistency**: Read-side `DropDeleteReader` depends on correct 
RowKind values
   - **Data semantics**: Loss of operational metadata
   
   ### 1.3 RowKind Definition
   
   PyPaimon defines four RowKind types:
   - **INSERT (0)**: +I - New record insertion
   - **UPDATE_BEFORE (1)**: -U - Previous value before update
   - **UPDATE_AFTER (2)**: +U - New value after update
   - **DELETE (3)**: -D - Record deletion
   
   ## II. Technical Solution Design
   
   ### 2.1 Core Design Approach
   
   Implement real RowKind extraction from input data by:
   1. Reading optional `__row_kind__` column from input RecordBatch
   2. Validating RowKind values (0-3 range)
   3. Using extracted values for `_VALUE_KIND` field
   4. Maintaining backward compatibility (defaulting to INSERT)
   
   ### 2.2 Solution Advantages
   
   1. **Full CDC Support**: Correctly handles all row operation types
   2. **Backward Compatible**: No breaking changes to existing API
   3. **Type-Safe**: Validates data types and value ranges
   4. **Well-Tested**: Comprehensive unit test coverage (12 tests)
   5. **Production-Ready**: Clean error messages and logging
   
   ### 2.3 API Design
   
   **User API - Method 1: Using `__row_kind__` column**
   ```python
   import pyarrow as pa
   from pypaimon.table.row.row_kind import RowKind
   
   data = pa.Table.from_pydict({
       'id': [1, 2, 3],
       'name': ['Alice', 'Bob', 'Charlie'],
       '__row_kind__': pa.array([
           RowKind.INSERT.value,      # 0
           RowKind.UPDATE_AFTER.value, # 2
           RowKind.DELETE.value        # 3
       ], type=pa.int32())
   })
   write.write_arrow(data)
   ```
   
   **User API - Method 2: Using row_kinds parameter**
   ```python
   data = pa.Table.from_pydict({
       'id': [1, 2, 3],
       'name': ['Alice', 'Bob', 'Charlie']
   })
   write.write_arrow(data, row_kinds=[0, 2, 3])
   ```
   
   ## III. Implementation Details
   
   ### 3.1 KeyValueDataWriter Modifications
   
   **File**: `paimon-python/pypaimon/write/writer/key_value_data_writer.py`
   
   #### New Method: `_extract_row_kind_column()`
   ```python
   def _extract_row_kind_column(self, data: pa.RecordBatch, num_rows: int) -> 
pa.Array:
       """Extract or generate RowKind column from input data.
       
       - Validates data type (must be int32)
       - Validates values are in range [0-3]
       - Returns extracted column or default INSERT (0) for all rows
       """
   ```
   
   **Key Features:**
   - Checks for `__row_kind__` column presence
   - Type validation: `int32` only
   - Value validation: 0-3 range check
   - Default behavior: All INSERT when column missing
   - Logging for debugging
   
   #### Updated Method: `_add_system_fields()`
   - Calls `_extract_row_kind_column()` instead of hardcoding
   - Removes temporary `__row_kind__` column after extraction
   - Maintains backward compatibility
   
   #### New Method: `_deduplicate_by_primary_key()`
   - Deduplicates data by primary key
   - Preserves latest record (maximum sequence number)
   - O(n) time complexity
   
   ### 3.2 BatchTableWrite API Enhancement
   
   **File**: `paimon-python/pypaimon/write/batch_table_write.py`
   
   #### Enhanced Methods:
   - `write_arrow(table, row_kinds: Optional[List[int]])` - Table-level write 
with RowKind
   - `write_arrow_batch(batch, row_kinds: Optional[List[int]])` - Batch-level 
write with RowKind
   
   #### New Helper Methods:
   - `_add_row_kind_column()` - Adds RowKind column to Table
   - `_add_row_kind_to_batch()` - Adds RowKind column to RecordBatch
   
   #### Improved Validation:
   - `_validate_pyarrow_schema()` - Now ignores temporary `__row_kind__` column
   
   
   ## IV. Before and After Comparison
   
   ### 4.1 Write Data Flow
   
   | Aspect | Before | After |
   |--------|--------|-------|
   | **RowKind Support** | Hardcoded INSERT (0) | Real RowKind (0-3) |
   | **CDC Scenarios** | Not supported | Fully supported |
   | **UPDATE Operations** | Lost metadata | Preserved as BEFORE/AFTER |
   | **DELETE Operations** | Indistinguishable | Marked as DELETE (3) |
   | **API** | No row_kinds parameter | Optional row_kinds parameter |
   | **Backward Compatibility** | N/A | 100% compatible |
   | **Default Behavior** | All INSERT | INSERT when no RowKind provided |
   
   ### 4.2 Example Data Transformation
   
   **Input Data (with RowKind):**
   ```
   id=1, name='Alice',   __row_kind__=0 (INSERT)
   id=2, name='Bob',     __row_kind__=2 (UPDATE_AFTER)
   id=3, name='Charlie', __row_kind__=3 (DELETE)
   ```
   
   **Before Implementation:**
   ```
   _KEY_id=1, name='Alice',   _VALUE_KIND=0, _SEQUENCE_NUMBER=1
   _KEY_id=2, name='Bob',     _VALUE_KIND=0, _SEQUENCE_NUMBER=2  ❌ Should be 2
   _KEY_id=3, name='Charlie', _VALUE_KIND=0, _SEQUENCE_NUMBER=3  ❌ Should be 3
   ```
   
   **After Implementation:**
   ```
   _KEY_id=1, name='Alice',   _VALUE_KIND=0, _SEQUENCE_NUMBER=1  ✅ Correct
   _KEY_id=2, name='Bob',     _VALUE_KIND=2, _SEQUENCE_NUMBER=2  ✅ Correct
   _KEY_id=3, name='Charlie', _VALUE_KIND=3, _SEQUENCE_NUMBER=3  ✅ Correct
   ```
   
   ### 4.3 Read-side Benefits
   
   **Without Real RowKind:**
   - DropDeleteReader cannot correctly identify DELETE rows
   - All rows appear as insertions
   - CDC semantics lost during reads
   
   **With Real RowKind:**
   - DropDeleteReader correctly filters DELETE rows (RowKind=3)
   - UPDATE operations maintain semantic correctness
   - Full CDC consistency across write-read pipeline
   
   ## V. Performance Analysis
   
   ### 5.1 Complexity Analysis
   
   | Operation | Time | Space | Notes |
   |-----------|------|-------|-------|
   | Extract RowKind | O(n) | O(1) | Single pass validation |
   | Deduplication | O(n) | O(m) | m = distinct primary keys |
   | Add RowKind column | O(1) | O(n) | PyArrow native operation |
   | Schema validation | O(1) | O(1) | Column presence check |
   
   ### 5.2 Memory Impact
   
   - Per-row overhead: 4 bytes (int32 RowKind)
   - Temporary column: Automatically cleaned up after extraction
   - Hash map for dedup: O(m) where m = unique primary keys
   
   
   ## Appendix: Code Examples
   
   ### Example 1: Basic RowKind Usage
   ```python
   import pyarrow as pa
   from pypaimon.table.row.row_kind import RowKind
   
   # Create data with RowKind information
   data = pa.Table.from_pydict({
       'user_id': [1, 2, 3, 4],
       'name': ['Alice', 'Bob', 'Charlie', 'Dave'],
       '__row_kind__': pa.array([
           RowKind.INSERT.value,       # 0 - New user
           RowKind.UPDATE_AFTER.value, # 2 - Updated name
           RowKind.DELETE.value,       # 3 - User deleted
           RowKind.INSERT.value        # 0 - New user
       ], type=pa.int32())
   })
   
   # Write with real RowKind values
   write.write_arrow(data)
   ```
   
   ### Example 2: Using row_kinds Parameter
   ```python
   # Data without __row_kind__ column
   data = pa.Table.from_pydict({
       'user_id': [1, 2, 3],
       'name': ['Alice', 'Bob', 'Charlie']
   })
   
   # Specify RowKind separately
   write.write_arrow(data, row_kinds=[0, 2, 3])
   ```
   
   ### Example 3: CDC Pipeline
   ```python
   # Simulate CDC events
   cdc_events = [
       {'user_id': 1, 'name': 'Alice', 'kind': 0},      # Insert
       {'user_id': 2, 'name': 'Bob', 'kind': 0},        # Insert
       {'user_id': 2, 'name': 'Bob_Updated', 'kind': 2},# Update
       {'user_id': 3, 'name': 'Charlie', 'kind': 3},    # Delete
   ]
   
   # Process CDC events
   for event in cdc_events:
       data = pa.Table.from_pydict({
           'user_id': [event['user_id']],
           'name': [event['name']],
           '__row_kind__': pa.array([event['kind']], type=pa.int32())
       })
       write.write_arrow(data)
   ```
   
   
   ### Solution
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to