This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8915c6078527 [SPARK-52705][SQL] Refactor deterministic check for
grouping expressions
8915c6078527 is described below
commit 8915c6078527c14d1d6086ed94e21bac74ba64dc
Author: Mihailo Timotic <[email protected]>
AuthorDate: Tue Jul 8 18:32:42 2025 +0800
[SPARK-52705][SQL] Refactor deterministic check for grouping expressions
### What changes were proposed in this pull request?
Move check for non-deterministic expressions in grouping expressions from
`ExprUtils` to `CheckAnalysis`.
### Why are the changes needed?
This is necessary in order to be able to utilize `PullOutNonDeterminstic`
rule as a post-processing rewrite rule in single-pass analyzer. Because
`ExprUtils.assertValidAggregate` is called during the bottom-up traversal, we
can't check for non-determinstic expressions there
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #51391 from mihailotim-db/mihailotim-db/pull_out_nondeterministic.
Authored-by: Mihailo Timotic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 14 +++++++++++++-
.../sql/catalyst/analysis/PullOutNondeterministic.scala | 14 +++++++++++++-
.../apache/spark/sql/catalyst/expressions/ExprUtils.scala | 12 ------------
3 files changed, 26 insertions(+), 14 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 863398de9cc9..5a58c24bc190 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -569,7 +569,19 @@ trait CheckAnalysis extends LookupCatalog with
QueryErrorsBase with PlanToString
messageParameters = Map.empty)
}
- case a: Aggregate => ExprUtils.assertValidAggregation(a)
+ case a: Aggregate =>
+ a.groupingExpressions.foreach(
+ expression =>
+ if (!expression.deterministic) {
+ throw SparkException.internalError(
+ msg = s"Non-deterministic expression
'${toSQLExpr(expression)}' should not " +
+ "appear in grouping expression.",
+ context = expression.origin.getQueryContext,
+ summary = expression.origin.context.summary
+ )
+ }
+ )
+ ExprUtils.assertValidAggregation(a)
case CollectMetrics(name, metrics, _, _) =>
if (name == null || name.isEmpty) {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala
index e0b984540cac..6769babdd1f1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.analysis
import scala.jdk.CollectionConverters._
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.ExprUtils.toSQLExpr
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
@@ -39,10 +41,20 @@ object PullOutNondeterministic extends Rule[LogicalPlan] {
val nondeterToAttr =
NondeterministicExpressionCollection.getNondeterministicToAttributes(a.groupingExpressions)
val newChild = Project(a.child.output ++
nondeterToAttr.values.asScala.toSeq, a.child)
- a.transformExpressions { case e =>
+ val deterministicAggregate = a.transformExpressions { case e =>
Option(nondeterToAttr.get(e)).map(_.toAttribute).getOrElse(e)
}.copy(child = newChild)
+ deterministicAggregate.groupingExpressions.foreach(expr => if
(!expr.deterministic) {
+ throw SparkException.internalError(
+ msg = s"Non-deterministic expression '${toSQLExpr(expr)}' should not
appear in " +
+ "grouping expression.",
+ context = expr.origin.getQueryContext,
+ summary = expr.origin.context.summary)
+ })
+
+ deterministicAggregate
+
// Don't touch collect metrics. Top-level metrics are not supported (check
analysis will fail)
// and we want to retain them inside the aggregate functions.
case m: CollectMetrics => m
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
index 8b7d641828ba..783de160f83b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions
import java.text.{DecimalFormat, DecimalFormatSymbols, ParsePosition}
import java.util.Locale
-import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch,
TypeCheckSuccess}
@@ -209,17 +208,6 @@ object ExprUtils extends EvalHelper with QueryErrorsBase {
"sqlExpr" -> toSQLExpr(expr),
"dataType" -> toSQLType(expr.dataType)))
}
-
- if (!expr.deterministic) {
- // This is just a sanity check, our analysis rule
PullOutNondeterministic should
- // already pull out those nondeterministic expressions and evaluate
them in
- // a Project node.
- throw SparkException.internalError(
- msg = s"Non-deterministic expression '${toSQLExpr(expr)}' should not
appear in " +
- "grouping expression.",
- context = expr.origin.getQueryContext,
- summary = expr.origin.context.summary)
- }
}
a.groupingExpressions.foreach(checkValidGroupingExprs)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]