FelixYBW commented on PR #11419:
URL: 
https://github.com/apache/incubator-gluten/pull/11419#issuecomment-3889033830

   See if it's useful:
   
   # Analysis of PR #11419 - Partition Key Generation Issue
   
   ## PR Overview
   **Title**: [GLUTEN-10215][VL] Delta write: Native statistics tracker to 
eliminate C2R overhead
   
   **Purpose**: Adds a native job statistics tracker for Delta write to 
eliminate Columnar-to-Row (C2R) conversion overhead by using Velox's native 
aggregation capabilities.
   
   ## Issue: Wrong Partition Key Generation
   
   ### Root Cause Analysis
   
   The PR introduces a new native statistics tracker 
(`GlutenDeltaJobStatsNativeTracker`) that has an **empty implementation** for 
the `newPartition` method:
   
   ```scala
   override def newPartition(partitionValues: InternalRow): Unit = {}
   ```
   
   **Location in patch**: Line 633
   
   ### Problem Explanation
   
   #### Before the PR:
   The fallback tracker properly delegates partition information:
   ```scala
   override def newPartition(partitionValues: InternalRow): Unit =
     delegate.newPartition(partitionValues)
   ```
   
   #### After the PR:
   The native tracker **ignores** partition values:
   ```scala
   override def newPartition(partitionValues: InternalRow): Unit = {}
   ```
   
   ### Why This Causes Wrong Partition Keys
   
   1. **Missing Partition Context**: When `newPartition()` is called with 
partition values, the native tracker doesn't store or propagate this 
information to the underlying Delta statistics collection mechanism.
   
   2. **File-to-Partition Mapping Lost**: The tracker creates accumulators per 
file path but doesn't associate them with their partition values:
      ```scala
      override def newFile(filePath: String): Unit = {
        accumulators.getOrElseUpdate(
          filePath,
          new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner, 
dataCols, statsColExpr)
        )
      }
      ```
   
   3. **Statistics Without Partition Info**: When statistics are collected, 
they lack the partition context needed to generate correct partition keys in 
the Delta log.
   
   ### Impact
   
   - **Partition columns are not correctly tracked** in the statistics
   - **Delta Lake metadata** may have incorrect or missing partition information
   - **Query performance degradation** due to incorrect partition pruning
   - **Data correctness issues** if partition-based operations rely on this 
metadata
   
   ## Solution
   
   ### Option 1: Store and Use Partition Values (Recommended)
   
   ```scala
   private class GlutenDeltaTaskStatsNativeTracker(
       delegate: WriteTaskStatsTracker,
       dataCols: Seq[Attribute],
       statsColExpr: Expression,
       resultThreadRunner: ThreadPoolExecutor)
     extends WriteTaskStatsTracker {
     
     private val accumulators = mutable.Map[String, VeloxTaskStatsAccumulator]()
     private val fileToPartition = mutable.Map[String, InternalRow]()  // ADD 
THIS
     private val evaluator = NativePlanEvaluator.create(
       BackendsApiManager.getBackendName,
       Map.empty[String, String].asJava)
   
     override def newPartition(partitionValues: InternalRow): Unit = {
       // Store current partition values for subsequent file operations
       currentPartitionValues = partitionValues  // ADD THIS
     }
   
     override def newFile(filePath: String): Unit = {
       accumulators.getOrElseUpdate(
         filePath,
         new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner, dataCols, 
statsColExpr)
       )
       // Associate file with its partition
       if (currentPartitionValues != null) {  // ADD THIS
         fileToPartition(filePath) = currentPartitionValues.copy()
       }
     }
   
     override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
       // Use fileToPartition mapping when building statistics
       // to ensure correct partition keys
       // ... implementation needs to pass partition info to delegate
     }
   }
   ```
   
   ### Option 2: Delegate to Underlying Tracker
   
   If the native tracker doesn't need to handle partitions directly:
   
   ```scala
   override def newPartition(partitionValues: InternalRow): Unit = {
     delegate.newPartition(partitionValues)
   }
   ```
   
   ### Option 3: Hybrid Approach
   
   Store partition values AND delegate:
   
   ```scala
   private var currentPartitionValues: InternalRow = _
   
   override def newPartition(partitionValues: InternalRow): Unit = {
     currentPartitionValues = partitionValues
     delegate.newPartition(partitionValues)
   }
   ```
   
   ## Testing Recommendations
   
   1. **Add partition-specific tests**:
      ```scala
      test("native stats tracker preserves partition values") {
        // Write data with multiple partitions
        // Verify partition keys in Delta log match expected values
      }
      ```
   
   2. **Verify statistics correctness**:
      ```scala
      test("partition statistics are correctly computed") {
        // Check min/max values per partition
        // Verify row counts per partition
      }
      ```
   
   3. **Test partition pruning**:
      ```scala
      test("queries with partition filters use correct statistics") {
        // Write partitioned data
        // Query with partition filter
        // Verify only relevant partitions are scanned
      }
      ```
   
   ## Related Code Sections
   
   ### Comparison with Other Trackers
   
   1. **Fallback Tracker** (Line 411-413):
      ```scala
      override def newPartition(partitionValues: InternalRow): Unit =
        delegate.newPartition(partitionValues)
      ```
      ✅ Correctly delegates
   
   2. **Row Counting Tracker** (Line 491-492):
      ```scala
      override def newPartition(partitionValues: InternalRow): Unit =
        delegate.newPartition(partitionValues)
      ```
      ✅ Correctly delegates
   
   3. **Native Tracker** (Line 633):
      ```scala
      override def newPartition(partitionValues: InternalRow): Unit = {}
      ```
      ❌ **PROBLEM: Empty implementation**
   
   ## Conclusion
   
   The issue is in the 
`GlutenDeltaJobStatsNativeTracker.GlutenDeltaTaskStatsNativeTracker` class 
where the `newPartition()` method has an empty implementation. This causes 
partition values to be lost, resulting in incorrect partition key generation in 
the Delta Lake metadata.
   
   **Fix**: Implement proper partition value handling in the native tracker, 
either by:
   1. Storing and using partition values internally
   2. Delegating to the underlying tracker
   3. Both (recommended for robustness)
   
   The fix should ensure partition values are correctly associated with files 
and propagated to the final statistics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to