yaooqinn commented on code in PR #12084:
URL: https://github.com/apache/gluten/pull/12084#discussion_r3253171724


##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala:
##########
@@ -128,10 +128,22 @@ object ExecUtil {
           ascending = true,
           samplePointsPerPartitionHint = 
SQLConf.get.rangeExchangeSampleSizePerPartition)
         Some(part)
+      case k @ KeyGroupedPartitioning(_, n, _, _) =>
+        // Build a lookup map from partition key values to partition indices.
+        // KeyGroupedPartitioner.getPartition uses getOrElseUpdate: if a key 
is not found
+        // in the map, it falls back to hash-based assignment 
(nonNegativeMod). This is a
+        // best-effort fallback - all expected keys should be present in 
uniquePartitionValues.
+        val dataTypes = k.expressions.map(_.dataType)
+        val valueMap = scala.collection.mutable.Map.empty[Seq[Any], Int]
+        k.uniquePartitionValues.zipWithIndex.foreach {
+          case (partition, index) =>
+            valueMap.update(partition.toSeq(dataTypes), index)

Review Comment:
   **[BLOCKING] `numPartitions` / `valueMap.size` mismatch when 
`isPartiallyClustered=true` or `partitionValues` contain duplicates**
   
   `KeyGroupedPartitioner` is constructed with the original `n = 
KeyGroupedPartitioning.numPartitions` (i.e. `partitionValues.size`, including 
replicas), while `valueMap` is built from the deduped `uniquePartitionValues`. 
When `isPartiallyClustered=true` (the SPJ skew-handling path), 
`partitionValues` deliberately replicates entries → `numPartitions > 
uniquePartitionValues.size`. Result:
   
   - pids `[uniqueValues.size, n)` are unreachable via the map lookup.
   - The `getOrElseUpdate` hash fallback (`nonNegativeMod(key.hashCode, n)`) 
can land any unknown key into `[0, uniqueValues.size)`, **colliding with an 
already-occupied pid**.
   - Two distinct partition keys then end up in the same output partition, 
breaking the KGP per-partition-uniqueness invariant that downstream SPJ-aware 
operators rely on.
   
   For reference, vanilla Spark `master`'s `ShuffleExchangeExec` KGP branch 
([`ShuffleExchangeExec.scala:378-383`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L378-L383))
 calls `k.toGrouped` first — which dedups `partitionKeys` and redefines 
`numPartitions := partitionKeys.length` — *before* constructing the 
`KeyGroupedPartitioner`. That canonicalization is what guarantees 
`valueMap.size == numPartitions`. (Spark 3.5's `ShuffleExchangeExec` has no KGP 
branch at all — KGP is consumed only by `BatchScanExec` and never re-shuffled, 
so the problem doesn't arise there either.)
   
   Suggested fixes (any one):
   1. Use `k.uniquePartitionValues.size` as the partitioner's `numPartitions` 
instead of `n` (matches the dedup invariant).
   2. In `ColumnarShuffleExchangeExecBase.scala:107` reject 
`isPartiallyClustered=true` and any case where `partitionValues.size != 
uniquePartitionValues.size` via `ValidationResult.failed`, restricting the 
columnar path to the strictly-1:1 case.
   3. Port the `.toGrouped`-equivalent canonicalization (dedup + renumber) 
before building `valueMap` and the partitioner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to