This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 5d5f1030cb08 [SPARK-46367][SQL][FOLLOWUP] Assert same arity in 
projectKeyedPartitionings
5d5f1030cb08 is described below

commit 5d5f1030cb087fd96ee03bef2d7202edf0cbb892
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri May 15 11:41:28 2026 +0800

    [SPARK-46367][SQL][FOLLOWUP] Assert same arity in projectKeyedPartitionings
    
    ### What changes were proposed in this pull request?
    Follow-up to https://github.com/apache/spark/pull/55519.
    
    `PartitioningPreservingUnaryExecNode.projectKeyedPartitionings` assumes all 
input KPs share the same `partitionKeys`, which implies the same expression 
arity. This invariant is asserted by `GroupPartitionsExec` and is established 
by every upstream constructor of `PartitioningCollection` that feeds this 
method (a join's `PartitioningCollection(left.outputPartitioning, 
right.outputPartitioning)` combines KPs that `EnsureRequirements` has aligned 
to the same join keys).
    
    If the invariant is ever violated upstream, indexing `kp.expressions(i)` 
for `i >= kp.expressions.length` throws an opaque `IndexOutOfBoundsException` 
that points at this method rather than at the producer.
    
    This PR adds an `assert` that surfaces the real cause with a clear message. 
The invariant is unchanged; this just turns silent misuse into a debuggable 
failure, so a producer-side bug can be fixed at its source.
    
    ### Why are the changes needed?
    Improve error message when an upstream node violates the same-arity 
invariant. No behavior change on valid plans.
    
    ### Does this PR introduce _any_ user-facing change?
    No (assertion-only; planner internal).
    
    ### How was this patch tested?
    New unit test `SPARK-46367: mixed-arity KeyedPartitionings in input fail 
with a clear assertion` in `ProjectedOrderingAndPartitioningSuite` that 
constructs a mixed-arity `PartitioningCollection` child and verifies the assert 
fires with the expected message instead of throwing `IndexOutOfBoundsException`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Generated-by: Claude Opus 4.7
    
    Closes #55876 from cloud-fan/SPARK-46367-followup.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 2530561c24939fb8eaf8237889970203c23b6611)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/execution/AliasAwareOutputExpression.scala | 10 ++++++++++
 .../ProjectedOrderingAndPartitioningSuite.scala    | 23 ++++++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
index 910cbcf2210a..1f2b1d0a585d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
@@ -94,6 +94,16 @@ trait PartitioningPreservingUnaryExecNode extends 
UnaryExecNode
       kps: Seq[KeyedPartitioning]): LazyList[KeyedPartitioning] = {
     if (kps.isEmpty) return LazyList.empty
     val numPositions = kps.head.expressions.length
+    // The function assumes all input KPs share the same `partitionKeys`, 
which implies matching
+    // expression arity. This invariant is asserted by [[GroupPartitionsExec]] 
and is established
+    // by the constructors of [[PartitioningCollection]] feeding this method 
(a join's
+    // `PartitioningCollection(left.outputPartitioning, 
right.outputPartitioning)` combines KPs
+    // that have been aligned by [[EnsureRequirements]] to the same join 
keys). If the invariant
+    // is ever violated upstream, fail early with a clear message instead of 
throwing an opaque
+    // `IndexOutOfBoundsException` from `kp.expressions(i)` below.
+    assert(kps.tail.forall(_.expressions.length == numPositions),
+      s"All input KeyedPartitionings must share the same expression arity, " +
+        s"but got: ${kps.map(_.expressions.length).mkString(", ")}.")
 
     val alternativesPerPosition: IndexedSeq[LazyList[Expression]] =
       if (hasAlias) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
index db664b04ef08..a38570924620 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
@@ -586,6 +586,29 @@ class ProjectedOrderingAndPartitioningSuite
       case other => fail(s"Expected KeyedPartitioning, got $other")
     }
   }
+
+  test("SPARK-46367: mixed-arity KeyedPartitionings in input fail with a clear 
assertion") {
+    // The function assumes all input KPs share the same arity (the invariant 
asserted by
+    // `GroupPartitionsExec`). Without the assert below, indexing 
`kp.expressions(i)` for
+    // `i >= kp.expressions.length` would throw an opaque 
`IndexOutOfBoundsException`. The assert
+    // surfaces the real cause -- an upstream node violated the invariant -- 
so the bug can be
+    // fixed at the producer.
+    val x = AttributeReference("x", IntegerType)()
+    val y = AttributeReference("y", IntegerType)()
+    val keys2d = Seq(InternalRow(1, 1), InternalRow(2, 2))
+    val keys1d = Seq(InternalRow(1), InternalRow(2))
+    val child = DummyLeafExecWithPartitioning(
+      output = Seq(x, y),
+      partitioning = PartitioningCollection(Seq(
+        KeyedPartitioning(Seq(x, y), keys2d),
+        KeyedPartitioning(Seq(x), keys1d))))
+    val project = ProjectExec(Seq(x), child)
+    val e = intercept[AssertionError] {
+      project.outputPartitioning
+    }
+    assert(e.getMessage.contains("All input KeyedPartitionings must share the 
same expression " +
+      "arity"))
+  }
 }
 
 private case class DummyLeafExecWithPartitioning(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to