zhztheplayer commented on PR #11419:
URL:
https://github.com/apache/incubator-gluten/pull/11419#issuecomment-3891877722
> See if it's useful:
>
> # Analysis of PR #11419 - Partition Key Generation Issue
> ## PR Overview
> **Title**: [GLUTEN-10215][VL] Delta write: Native statistics tracker to
eliminate C2R overhead
>
> **Purpose**: Adds a native job statistics tracker for Delta write to
eliminate Columnar-to-Row (C2R) conversion overhead by using Velox's native
aggregation capabilities.
>
> ## Issue: Wrong Partition Key Generation
> ### Root Cause Analysis
> The PR introduces a new native statistics tracker
(`GlutenDeltaJobStatsNativeTracker`) that has an **empty implementation** for
the `newPartition` method:
>
> ```scala
> override def newPartition(partitionValues: InternalRow): Unit = {}
> ```
>
> **Location in patch**: Line 633
>
> ### Problem Explanation
> #### Before the PR:
> The fallback tracker properly delegates partition information:
>
> ```scala
> override def newPartition(partitionValues: InternalRow): Unit =
> delegate.newPartition(partitionValues)
> ```
>
> #### After the PR:
> The native tracker **ignores** partition values:
>
> ```scala
> override def newPartition(partitionValues: InternalRow): Unit = {}
> ```
>
> ### Why This Causes Wrong Partition Keys
> 1. **Missing Partition Context**: When `newPartition()` is called with
partition values, the native tracker doesn't store or propagate this
information to the underlying Delta statistics collection mechanism.
> 2. **File-to-Partition Mapping Lost**: The tracker creates accumulators
per file path but doesn't associate them with their partition values:
> ```scala
> override def newFile(filePath: String): Unit = {
> accumulators.getOrElseUpdate(
> filePath,
> new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner,
dataCols, statsColExpr)
> )
> }
> ```
> 3. **Statistics Without Partition Info**: When statistics are collected,
they lack the partition context needed to generate correct partition keys in
the Delta log.
>
> ### Impact
> * **Partition columns are not correctly tracked** in the statistics
> * **Delta Lake metadata** may have incorrect or missing partition
information
> * **Query performance degradation** due to incorrect partition pruning
> * **Data correctness issues** if partition-based operations rely on this
metadata
>
> ## Solution
> ### Option 1: Store and Use Partition Values (Recommended)
> ```scala
> private class GlutenDeltaTaskStatsNativeTracker(
> delegate: WriteTaskStatsTracker,
> dataCols: Seq[Attribute],
> statsColExpr: Expression,
> resultThreadRunner: ThreadPoolExecutor)
> extends WriteTaskStatsTracker {
>
> private val accumulators = mutable.Map[String,
VeloxTaskStatsAccumulator]()
> private val fileToPartition = mutable.Map[String, InternalRow]() // ADD
THIS
> private val evaluator = NativePlanEvaluator.create(
> BackendsApiManager.getBackendName,
> Map.empty[String, String].asJava)
>
> override def newPartition(partitionValues: InternalRow): Unit = {
> // Store current partition values for subsequent file operations
> currentPartitionValues = partitionValues // ADD THIS
> }
>
> override def newFile(filePath: String): Unit = {
> accumulators.getOrElseUpdate(
> filePath,
> new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner,
dataCols, statsColExpr)
> )
> // Associate file with its partition
> if (currentPartitionValues != null) { // ADD THIS
> fileToPartition(filePath) = currentPartitionValues.copy()
> }
> }
>
> override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
> // Use fileToPartition mapping when building statistics
> // to ensure correct partition keys
> // ... implementation needs to pass partition info to delegate
> }
> }
> ```
>
> ### Option 2: Delegate to Underlying Tracker
> If the native tracker doesn't need to handle partitions directly:
>
> ```scala
> override def newPartition(partitionValues: InternalRow): Unit = {
> delegate.newPartition(partitionValues)
> }
> ```
>
> ### Option 3: Hybrid Approach
> Store partition values AND delegate:
>
> ```scala
> private var currentPartitionValues: InternalRow = _
>
> override def newPartition(partitionValues: InternalRow): Unit = {
> currentPartitionValues = partitionValues
> delegate.newPartition(partitionValues)
> }
> ```
>
> ## Testing Recommendations
> 1. **Add partition-specific tests**:
> ```scala
> test("native stats tracker preserves partition values") {
> // Write data with multiple partitions
> // Verify partition keys in Delta log match expected values
> }
> ```
> 2. **Verify statistics correctness**:
> ```scala
> test("partition statistics are correctly computed") {
> // Check min/max values per partition
> // Verify row counts per partition
> }
> ```
> 3. **Test partition pruning**:
> ```scala
> test("queries with partition filters use correct statistics") {
> // Write partitioned data
> // Query with partition filter
> // Verify only relevant partitions are scanned
> }
> ```
>
> ## Related Code Sections
> ### Comparison with Other Trackers
> 1. **Fallback Tracker** (Line 411-413):
> ```scala
> override def newPartition(partitionValues: InternalRow): Unit =
> delegate.newPartition(partitionValues)
> ```
>
>
>
>
>
>
>
>
>
>
>
> ✅ Correctly delegates
> 2. **Row Counting Tracker** (Line 491-492):
> ```scala
> override def newPartition(partitionValues: InternalRow): Unit =
> delegate.newPartition(partitionValues)
> ```
>
>
>
>
>
>
>
>
>
>
>
> ✅ Correctly delegates
> 3. **Native Tracker** (Line 633):
> ```scala
> override def newPartition(partitionValues: InternalRow): Unit = {}
> ```
>
>
>
>
>
>
>
>
>
>
>
> ❌ **PROBLEM: Empty implementation**
>
> ## Conclusion
> The issue is in the
`GlutenDeltaJobStatsNativeTracker.GlutenDeltaTaskStatsNativeTracker` class
where the `newPartition()` method has an empty implementation. This causes
partition values to be lost, resulting in incorrect partition key generation in
the Delta Lake metadata.
>
> **Fix**: Implement proper partition value handling in the native tracker,
either by:
>
> 1. Storing and using partition values internally
> 2. Delegating to the underlying tracker
> 3. Both (recommended for robustness)
>
> The fix should ensure partition values are correctly associated with files
and propagated to the final statistics.
@FelixYBW Just saw this. This was actually not the root cause, `override def
newPartition(partitionValues: InternalRow): Unit = { }` was intentionally left
empty because the vanilla Delta stats writer also does nothing in this method.
The problem was with `VeloxBlockStripes` and I've already pushed the fix.
Now everything should be fine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]