zhztheplayer commented on PR #11419:
URL: 
https://github.com/apache/incubator-gluten/pull/11419#issuecomment-3891877722

   > See if it's useful:
   > 
   > # Analysis of PR #11419 - Partition Key Generation Issue
   > ## PR Overview
   > **Title**: [GLUTEN-10215][VL] Delta write: Native statistics tracker to 
eliminate C2R overhead
   > 
   > **Purpose**: Adds a native job statistics tracker for Delta write to 
eliminate Columnar-to-Row (C2R) conversion overhead by using Velox's native 
aggregation capabilities.
   > 
   > ## Issue: Wrong Partition Key Generation
   > ### Root Cause Analysis
   > The PR introduces a new native statistics tracker 
(`GlutenDeltaJobStatsNativeTracker`) that has an **empty implementation** for 
the `newPartition` method:
   > 
   > ```scala
   > override def newPartition(partitionValues: InternalRow): Unit = {}
   > ```
   > 
   > **Location in patch**: Line 633
   > 
   > ### Problem Explanation
   > #### Before the PR:
   > The fallback tracker properly delegates partition information:
   > 
   > ```scala
   > override def newPartition(partitionValues: InternalRow): Unit =
   >   delegate.newPartition(partitionValues)
   > ```
   > 
   > #### After the PR:
   > The native tracker **ignores** partition values:
   > 
   > ```scala
   > override def newPartition(partitionValues: InternalRow): Unit = {}
   > ```
   > 
   > ### Why This Causes Wrong Partition Keys
   > 1. **Missing Partition Context**: When `newPartition()` is called with 
partition values, the native tracker doesn't store or propagate this 
information to the underlying Delta statistics collection mechanism.
   > 2. **File-to-Partition Mapping Lost**: The tracker creates accumulators 
per file path but doesn't associate them with their partition values:
   >    ```scala
   >    override def newFile(filePath: String): Unit = {
   >      accumulators.getOrElseUpdate(
   >        filePath,
   >        new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner, 
dataCols, statsColExpr)
   >      )
   >    }
   >    ```
   > 3. **Statistics Without Partition Info**: When statistics are collected, 
they lack the partition context needed to generate correct partition keys in 
the Delta log.
   > 
   > ### Impact
   > * **Partition columns are not correctly tracked** in the statistics
   > * **Delta Lake metadata** may have incorrect or missing partition 
information
   > * **Query performance degradation** due to incorrect partition pruning
   > * **Data correctness issues** if partition-based operations rely on this 
metadata
   > 
   > ## Solution
   > ### Option 1: Store and Use Partition Values (Recommended)
   > ```scala
   > private class GlutenDeltaTaskStatsNativeTracker(
   >     delegate: WriteTaskStatsTracker,
   >     dataCols: Seq[Attribute],
   >     statsColExpr: Expression,
   >     resultThreadRunner: ThreadPoolExecutor)
   >   extends WriteTaskStatsTracker {
   >   
   >   private val accumulators = mutable.Map[String, 
VeloxTaskStatsAccumulator]()
   >   private val fileToPartition = mutable.Map[String, InternalRow]()  // ADD 
THIS
   >   private val evaluator = NativePlanEvaluator.create(
   >     BackendsApiManager.getBackendName,
   >     Map.empty[String, String].asJava)
   > 
   >   override def newPartition(partitionValues: InternalRow): Unit = {
   >     // Store current partition values for subsequent file operations
   >     currentPartitionValues = partitionValues  // ADD THIS
   >   }
   > 
   >   override def newFile(filePath: String): Unit = {
   >     accumulators.getOrElseUpdate(
   >       filePath,
   >       new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner, 
dataCols, statsColExpr)
   >     )
   >     // Associate file with its partition
   >     if (currentPartitionValues != null) {  // ADD THIS
   >       fileToPartition(filePath) = currentPartitionValues.copy()
   >     }
   >   }
   > 
   >   override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
   >     // Use fileToPartition mapping when building statistics
   >     // to ensure correct partition keys
   >     // ... implementation needs to pass partition info to delegate
   >   }
   > }
   > ```
   > 
   > ### Option 2: Delegate to Underlying Tracker
   > If the native tracker doesn't need to handle partitions directly:
   > 
   > ```scala
   > override def newPartition(partitionValues: InternalRow): Unit = {
   >   delegate.newPartition(partitionValues)
   > }
   > ```
   > 
   > ### Option 3: Hybrid Approach
   > Store partition values AND delegate:
   > 
   > ```scala
   > private var currentPartitionValues: InternalRow = _
   > 
   > override def newPartition(partitionValues: InternalRow): Unit = {
   >   currentPartitionValues = partitionValues
   >   delegate.newPartition(partitionValues)
   > }
   > ```
   > 
   > ## Testing Recommendations
   > 1. **Add partition-specific tests**:
   >    ```scala
   >    test("native stats tracker preserves partition values") {
   >      // Write data with multiple partitions
   >      // Verify partition keys in Delta log match expected values
   >    }
   >    ```
   > 2. **Verify statistics correctness**:
   >    ```scala
   >    test("partition statistics are correctly computed") {
   >      // Check min/max values per partition
   >      // Verify row counts per partition
   >    }
   >    ```
   > 3. **Test partition pruning**:
   >    ```scala
   >    test("queries with partition filters use correct statistics") {
   >      // Write partitioned data
   >      // Query with partition filter
   >      // Verify only relevant partitions are scanned
   >    }
   >    ```
   > 
   > ## Related Code Sections
   > ### Comparison with Other Trackers
   > 1. **Fallback Tracker** (Line 411-413):
   >    ```scala
   >    override def newPartition(partitionValues: InternalRow): Unit =
   >      delegate.newPartition(partitionValues)
   >    ```
   >    
   >    
   >        
   >          
   >        
   >    
   >          
   >        
   >    
   >        
   >      
   >    ✅ Correctly delegates
   > 2. **Row Counting Tracker** (Line 491-492):
   >    ```scala
   >    override def newPartition(partitionValues: InternalRow): Unit =
   >      delegate.newPartition(partitionValues)
   >    ```
   >    
   >    
   >        
   >          
   >        
   >    
   >          
   >        
   >    
   >        
   >      
   >    ✅ Correctly delegates
   > 3. **Native Tracker** (Line 633):
   >    ```scala
   >    override def newPartition(partitionValues: InternalRow): Unit = {}
   >    ```
   >    
   >    
   >        
   >          
   >        
   >    
   >          
   >        
   >    
   >        
   >      
   >    ❌ **PROBLEM: Empty implementation**
   > 
   > ## Conclusion
   > The issue is in the 
`GlutenDeltaJobStatsNativeTracker.GlutenDeltaTaskStatsNativeTracker` class 
where the `newPartition()` method has an empty implementation. This causes 
partition values to be lost, resulting in incorrect partition key generation in 
the Delta Lake metadata.
   > 
   > **Fix**: Implement proper partition value handling in the native tracker, 
either by:
   > 
   > 1. Storing and using partition values internally
   > 2. Delegating to the underlying tracker
   > 3. Both (recommended for robustness)
   > 
   > The fix should ensure partition values are correctly associated with files 
and propagated to the final statistics.
   
   @FelixYBW Just saw this. This was actually not the root cause, `override def 
newPartition(partitionValues: InternalRow): Unit = { }` was intentionally left 
empty because the vanilla Delta stats writer also does nothing in this method.
   
   The problem was with `VeloxBlockStripes` and I've already pushed the fix. 
Now everything should be fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to