This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0e52b59b540f [SPARK-47939][SQL] Implement a new Analyzer rule to move ParameterizedQuery inside ExplainCommand and DescribeQueryCommand 0e52b59b540f is described below commit 0e52b59b540fac85972453093805c76b4f71cb94 Author: Vladimir Golubev <vladimir.golu...@databricks.com> AuthorDate: Mon Apr 29 17:00:32 2024 +0800 [SPARK-47939][SQL] Implement a new Analyzer rule to move ParameterizedQuery inside ExplainCommand and DescribeQueryCommand ### What changes were proposed in this pull request? Mark `DescribeQueryCommand` and `ExplainCommand` as `SupervisingCommand` (they don't expose their wrapped nodes, but supervise them internally). Introduce a new Analyzer rule `MoveParameterizedQueriesDown`, which moves `ParameterizedQuery` inside `SupervisingCommand` for parameters to be resolved correctly. ### Why are the changes needed? Parameterized `EXPLAIN` and `DESCRIBE` queries: - `spark.sql("describe select ?", Array(1)).show();` - `spark.sql("explain select ?", Array(1)).show();` fail with `org.apache.spark.sql.catalyst.ExtendedAnalysisException: [UNBOUND_SQL_PARAMETER] Found the unbound parameter: _16. Please, fix `args` and provide a mapping of the parameter to either a SQL literal or collection constructor functions such as `map()`, `array()`, `struct()`. SQLSTATE: 42P02; line 1 pos 16; 'Project [unresolvedalias(posparameter(16))] +- OneRowRelation` ### Does this PR introduce _any_ user-facing change? Yes, parameterized `EXPLAIN` and `DESCRIBE` should start working for users ### How was this patch tested? - Run `sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala` - Run `sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala` - New tests for `SQLQuerySuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes #46209 from vladimirg-db/vladimirg-db/make-explain-and-describe-work-with-parameters. Authored-by: Vladimir Golubev <vladimir.golu...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 1 + .../spark/sql/catalyst/analysis/parameters.scala | 64 ++++++++-- .../spark/sql/catalyst/plans/logical/Command.scala | 11 ++ .../spark/sql/execution/command/commands.scala | 7 +- .../spark/sql/execution/command/tables.scala | 5 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 141 +++++++++++++++++++++ 6 files changed, 218 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4b753e1f28e5..c29432c916f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -325,6 +325,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor RewriteDeleteFromTable :: RewriteUpdateTable :: RewriteMergeIntoTable :: + MoveParameterizedQueriesDown :: BindParameters :: typeCoercionRules() ++ Seq( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala index f1cc44b270bc..5b365a0d49ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions.{Alias, CreateArray, CreateMap, CreateNamedStruct, Expression, LeafExpression, Literal, MapFromArrays, MapFromEntries, SubqueryExpression, Unevaluable, VariableReference} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SupervisingCommand} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.TreePattern.{PARAMETER, PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH} +import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMAND, PARAMETER, PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH} import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.types.DataType @@ -104,12 +104,64 @@ case class PosParameterizedQuery(child: LogicalPlan, args: Seq[Expression]) copy(child = newChild) } +/** + * Base class for rules that process parameterized queries. + */ +abstract class ParameterizedQueryProcessor extends Rule[LogicalPlan] { + def assertUnresolvedPlanHasSingleParameterizedQuery(plan: LogicalPlan): Unit = { + if (plan.containsPattern(PARAMETERIZED_QUERY)) { + val parameterizedQueries = plan.collect { case p: ParameterizedQuery => p } + assert(parameterizedQueries.length == 1) + } + } +} + +/** + * Moves `ParameterizedQuery` inside `SupervisingCommand` for their supervised plans to be + * resolved later by the analyzer. + * + * - Basic case: + * `PosParameterizedQuery(ExplainCommand(SomeQuery(...)))` => + * `ExplainCommand(PosParameterizedQuery(SomeQuery(...)))` + * - Nested `SupervisedCommand`s are handled recursively: + * `PosParameterizedQuery(ExplainCommand(ExplainCommand(SomeQuery(...))))` => + * `ExplainCommand(ExplainCommand(PosParameterizedQuery(SomeQuery(...))))` + */ +object MoveParameterizedQueriesDown extends ParameterizedQueryProcessor { + override def apply(plan: LogicalPlan): LogicalPlan = { + assertUnresolvedPlanHasSingleParameterizedQuery(plan) + + plan.resolveOperatorsWithPruning(_.containsPattern(PARAMETERIZED_QUERY)) { + case pq: ParameterizedQuery if pq.exists(isSupervisingCommand) => + moveParameterizedQueryIntoSupervisingCommand(pq) + } + } + + private def moveParameterizedQueryIntoSupervisingCommand(pq: ParameterizedQuery): LogicalPlan = { + // Moves parameterized query down recursively to handle nested `SupervisingCommand`s + def transformSupervisedPlan: PartialFunction[LogicalPlan, LogicalPlan] = { + case command: SupervisingCommand => + command.withTransformedSupervisedPlan { + transformSupervisedPlan(_) + } + case plan => pq.withNewChildren(Seq(plan)) + } + + pq.child.resolveOperatorsWithPruning(_.containsPattern(COMMAND)) { + case command: SupervisingCommand => transformSupervisedPlan(command) + } + } + + private def isSupervisingCommand(plan: LogicalPlan): Boolean = + plan.containsPattern(COMMAND) && plan.isInstanceOf[SupervisingCommand] +} + /** * Finds all named parameters in `ParameterizedQuery` and substitutes them by literals or * by collection constructor functions such as `map()`, `array()`, `struct()` * from the user-specified arguments. */ -object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase { +object BindParameters extends ParameterizedQueryProcessor with QueryErrorsBase { private def checkArgs(args: Iterable[(String, Expression)]): Unit = { def isNotAllowed(expr: Expression): Boolean = expr.exists { case _: Literal | _: CreateArray | _: CreateNamedStruct | @@ -131,11 +183,7 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase { } override def apply(plan: LogicalPlan): LogicalPlan = { - if (plan.containsPattern(PARAMETERIZED_QUERY)) { - // One unresolved plan can have at most one ParameterizedQuery. - val parameterizedQueries = plan.collect { case p: ParameterizedQuery => p } - assert(parameterizedQueries.length == 1) - } + assertUnresolvedPlanHasSingleParameterizedQuery(plan) plan.resolveOperatorsWithPruning(_.containsPattern(PARAMETERIZED_QUERY)) { // We should wait for `CTESubstitution` to resolve CTE before binding parameters, as CTE diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala index fc9eb5d03e49..bd277e92d11d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala @@ -63,3 +63,14 @@ trait AnalysisOnlyCommand extends Command { // on the `AnalysisContext` def markAsAnalyzed(analysisContext: AnalysisContext): LogicalPlan } + +/** + * A logical node that does not expose its sub-nodes as children, but rather supervises them + * in an implementation-defined manner. + */ +trait SupervisingCommand extends LeafCommand { + /** + * Transforms its supervised plan using `transformer` and returns a copy of `SupervisingCommand` + */ + def withTransformedSupervisedPlan(transformer: LogicalPlan => LogicalPlan): LogicalPlan +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index a1e9c4229b19..eec79ad02e29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, SupervisingCommand} import org.apache.spark.sql.catalyst.trees.LeafLike import org.apache.spark.sql.connector.ExternalCommandRunner import org.apache.spark.sql.execution.{CommandExecutionMode, ExplainMode, LeafExecNode, SparkPlan, UnaryExecNode} @@ -157,7 +157,7 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) case class ExplainCommand( logicalPlan: LogicalPlan, mode: ExplainMode) - extends LeafRunnableCommand { + extends RunnableCommand with SupervisingCommand { override val output: Seq[Attribute] = Seq(AttributeReference("plan", StringType, nullable = true)()) @@ -171,6 +171,9 @@ case class ExplainCommand( ("Error occurred during query planning: \n" + cause.getMessage).split("\n") .map(Row(_)).toImmutableArraySeq } + + def withTransformedSupervisedPlan(transformer: LogicalPlan => LogicalPlan): LogicalPlan = + copy(logicalPlan = transformer(logicalPlan)) } /** An explain command for users to see how a streaming batch is executed. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 1a97b965da2b..990b7da339a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -760,7 +760,7 @@ case class DescribeTableCommand( * 7. Common table expressions (CTEs) */ case class DescribeQueryCommand(queryText: String, plan: LogicalPlan) - extends DescribeCommandBase with CTEInChildren { + extends DescribeCommandBase with SupervisingCommand with CTEInChildren { override val output = DescribeCommandSchema.describeTableAttributes() @@ -776,6 +776,9 @@ case class DescribeQueryCommand(queryText: String, plan: LogicalPlan) override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { copy(plan = WithCTE(plan, cteDefs)) } + + def withTransformedSupervisedPlan(transformer: LogicalPlan => LogicalPlan): LogicalPlan = + copy(plan = transformer(plan)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 78d4b91088a6..470f8ff4cd85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -4715,6 +4715,147 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark val df6 = df3.join(df2, col("df3.zaak_id") === col("df2.customer_id"), "outer") df5.crossJoin(df6) } + + test("SPARK-47939: Describe should work with parameterized queries") { + checkAnswer( + spark.sql("describe select ?", Array(1)), + Array( + Row("1", "int", null) + ) + ) + checkAnswer( + spark.sql("describe select :first", Map("first" -> 1)), + Array( + Row("1", "int", null) + ) + ) + + checkAnswer( + spark.sql("describe select * from values (?, ?) t(x, y)", Array(1, "a")), + Array( + Row("x", "int", null), + Row("y", "string", null) + ) + ) + checkAnswer( + spark.sql( + "describe select * from values (:first, :second) t(x, y)", + Map("first" -> 1, "second" -> "a") + ), + Array( + Row("x", "int", null), + Row("y", "string", null) + ) + ) + } + + test("SPARK-47939: Explain should work with parameterized queries") { + def checkQueryPlan(df: DataFrame, plan: String): Unit = assert( + df.collect() + .map(_.getString(0)) + .map(_.replaceAll("#[0-9]+", "#N")) + .sameElements(Array(plan.stripMargin)) + ) + + checkQueryPlan( + spark.sql("explain select ?", Array(1)), + """== Physical Plan == + |*(1) Project [1 AS 1#N] + |+- *(1) Scan OneRowRelation[] + + |""" + ) + checkQueryPlan( + spark.sql("explain select :first", Map("first" -> 1)), + """== Physical Plan == + |*(1) Project [1 AS 1#N] + |+- *(1) Scan OneRowRelation[] + + |""" + ) + + checkQueryPlan( + spark.sql("explain explain explain select ?", Array(1)), + """== Physical Plan == + |Execute ExplainCommand + | +- ExplainCommand ExplainCommand 'PosParameterizedQuery [1], SimpleMode, SimpleMode + + |""" + ) + checkQueryPlan( + spark.sql("explain explain explain select :first", Map("first" -> 1)), + // scalastyle:off + """== Physical Plan == + |Execute ExplainCommand + | +- ExplainCommand ExplainCommand 'NameParameterizedQuery [first], [1], SimpleMode, SimpleMode + + |""" + // scalastyle:on + ) + + checkQueryPlan( + spark.sql("explain describe select ?", Array(1)), + """== Physical Plan == + |Execute DescribeQueryCommand + | +- DescribeQueryCommand select ? + + |""" + ) + checkQueryPlan( + spark.sql("explain describe select :first", Map("first" -> 1)), + """== Physical Plan == + |Execute DescribeQueryCommand + | +- DescribeQueryCommand select :first + + |""" + ) + + checkQueryPlan( + spark.sql("explain extended select * from values (?, ?) t(x, y)", Array(1, "a")), + """== Parsed Logical Plan == + |'PosParameterizedQuery [1, a] + |+- 'Project [*] + | +- 'SubqueryAlias t + | +- 'UnresolvedInlineTable [x, y], [[posparameter(39), posparameter(42)]] + + |== Analyzed Logical Plan == + |x: int, y: string + |Project [x#N, y#N] + |+- SubqueryAlias t + | +- LocalRelation [x#N, y#N] + + |== Optimized Logical Plan == + |LocalRelation [x#N, y#N] + + |== Physical Plan == + |LocalTableScan [x#N, y#N] + |""" + ) + checkQueryPlan( + spark.sql( + "explain extended select * from values (:first, :second) t(x, y)", + Map("first" -> 1, "second" -> "a") + ), + """== Parsed Logical Plan == + |'NameParameterizedQuery [first, second], [1, a] + |+- 'Project [*] + | +- 'SubqueryAlias t + | +- 'UnresolvedInlineTable [x, y], [[namedparameter(first), namedparameter(second)]] + + |== Analyzed Logical Plan == + |x: int, y: string + |Project [x#N, y#N] + |+- SubqueryAlias t + | +- LocalRelation [x#N, y#N] + + |== Optimized Logical Plan == + |LocalRelation [x#N, y#N] + + |== Physical Plan == + |LocalTableScan [x#N, y#N] + |""" + ) + } } case class Foo(bar: Option[String]) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org