FelixYBW commented on PR #11419:
URL:
https://github.com/apache/incubator-gluten/pull/11419#issuecomment-3889033830
See if it's useful:
# Analysis of PR #11419 - Partition Key Generation Issue
## PR Overview
**Title**: [GLUTEN-10215][VL] Delta write: Native statistics tracker to
eliminate C2R overhead
**Purpose**: Adds a native job statistics tracker for Delta write to
eliminate Columnar-to-Row (C2R) conversion overhead by using Velox's native
aggregation capabilities.
## Issue: Wrong Partition Key Generation
### Root Cause Analysis
The PR introduces a new native statistics tracker
(`GlutenDeltaJobStatsNativeTracker`) that has an **empty implementation** for
the `newPartition` method:
```scala
override def newPartition(partitionValues: InternalRow): Unit = {}
```
**Location in patch**: Line 633
### Problem Explanation
#### Before the PR:
The fallback tracker properly delegates partition information:
```scala
override def newPartition(partitionValues: InternalRow): Unit =
delegate.newPartition(partitionValues)
```
#### After the PR:
The native tracker **ignores** partition values:
```scala
override def newPartition(partitionValues: InternalRow): Unit = {}
```
### Why This Causes Wrong Partition Keys
1. **Missing Partition Context**: When `newPartition()` is called with
partition values, the native tracker doesn't store or propagate this
information to the underlying Delta statistics collection mechanism.
2. **File-to-Partition Mapping Lost**: The tracker creates accumulators per
file path but doesn't associate them with their partition values:
```scala
override def newFile(filePath: String): Unit = {
accumulators.getOrElseUpdate(
filePath,
new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner,
dataCols, statsColExpr)
)
}
```
3. **Statistics Without Partition Info**: When statistics are collected,
they lack the partition context needed to generate correct partition keys in
the Delta log.
### Impact
- **Partition columns are not correctly tracked** in the statistics
- **Delta Lake metadata** may have incorrect or missing partition information
- **Query performance degradation** due to incorrect partition pruning
- **Data correctness issues** if partition-based operations rely on this
metadata
## Solution
### Option 1: Store and Use Partition Values (Recommended)
```scala
private class GlutenDeltaTaskStatsNativeTracker(
delegate: WriteTaskStatsTracker,
dataCols: Seq[Attribute],
statsColExpr: Expression,
resultThreadRunner: ThreadPoolExecutor)
extends WriteTaskStatsTracker {
private val accumulators = mutable.Map[String, VeloxTaskStatsAccumulator]()
private val fileToPartition = mutable.Map[String, InternalRow]() // ADD
THIS
private val evaluator = NativePlanEvaluator.create(
BackendsApiManager.getBackendName,
Map.empty[String, String].asJava)
override def newPartition(partitionValues: InternalRow): Unit = {
// Store current partition values for subsequent file operations
currentPartitionValues = partitionValues // ADD THIS
}
override def newFile(filePath: String): Unit = {
accumulators.getOrElseUpdate(
filePath,
new VeloxTaskStatsAccumulator(evaluator, resultThreadRunner, dataCols,
statsColExpr)
)
// Associate file with its partition
if (currentPartitionValues != null) { // ADD THIS
fileToPartition(filePath) = currentPartitionValues.copy()
}
}
override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
// Use fileToPartition mapping when building statistics
// to ensure correct partition keys
// ... implementation needs to pass partition info to delegate
}
}
```
### Option 2: Delegate to Underlying Tracker
If the native tracker doesn't need to handle partitions directly:
```scala
override def newPartition(partitionValues: InternalRow): Unit = {
delegate.newPartition(partitionValues)
}
```
### Option 3: Hybrid Approach
Store partition values AND delegate:
```scala
private var currentPartitionValues: InternalRow = _
override def newPartition(partitionValues: InternalRow): Unit = {
currentPartitionValues = partitionValues
delegate.newPartition(partitionValues)
}
```
## Testing Recommendations
1. **Add partition-specific tests**:
```scala
test("native stats tracker preserves partition values") {
// Write data with multiple partitions
// Verify partition keys in Delta log match expected values
}
```
2. **Verify statistics correctness**:
```scala
test("partition statistics are correctly computed") {
// Check min/max values per partition
// Verify row counts per partition
}
```
3. **Test partition pruning**:
```scala
test("queries with partition filters use correct statistics") {
// Write partitioned data
// Query with partition filter
// Verify only relevant partitions are scanned
}
```
## Related Code Sections
### Comparison with Other Trackers
1. **Fallback Tracker** (Line 411-413):
```scala
override def newPartition(partitionValues: InternalRow): Unit =
delegate.newPartition(partitionValues)
```
✅ Correctly delegates
2. **Row Counting Tracker** (Line 491-492):
```scala
override def newPartition(partitionValues: InternalRow): Unit =
delegate.newPartition(partitionValues)
```
✅ Correctly delegates
3. **Native Tracker** (Line 633):
```scala
override def newPartition(partitionValues: InternalRow): Unit = {}
```
❌ **PROBLEM: Empty implementation**
## Conclusion
The issue is in the
`GlutenDeltaJobStatsNativeTracker.GlutenDeltaTaskStatsNativeTracker` class
where the `newPartition()` method has an empty implementation. This causes
partition values to be lost, resulting in incorrect partition key generation in
the Delta Lake metadata.
**Fix**: Implement proper partition value handling in the native tracker,
either by:
1. Storing and using partition values internally
2. Delegating to the underlying tracker
3. Both (recommended for robustness)
The fix should ensure partition values are correctly associated with files
and propagated to the final statistics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]