This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e52b59b540f [SPARK-47939][SQL] Implement a new Analyzer rule to move 
ParameterizedQuery inside ExplainCommand and DescribeQueryCommand
0e52b59b540f is described below

commit 0e52b59b540fac85972453093805c76b4f71cb94
Author: Vladimir Golubev <vladimir.golu...@databricks.com>
AuthorDate: Mon Apr 29 17:00:32 2024 +0800

    [SPARK-47939][SQL] Implement a new Analyzer rule to move ParameterizedQuery 
inside ExplainCommand and DescribeQueryCommand
    
    ### What changes were proposed in this pull request?
    Mark `DescribeQueryCommand` and `ExplainCommand` as `SupervisingCommand` 
(they don't expose their wrapped nodes, but supervise them internally). 
Introduce a new Analyzer rule `MoveParameterizedQueriesDown`, which moves 
`ParameterizedQuery` inside `SupervisingCommand` for parameters to be resolved 
correctly.
    
    ### Why are the changes needed?
    Parameterized `EXPLAIN` and `DESCRIBE` queries:
    - `spark.sql("describe select ?", Array(1)).show();`
    - `spark.sql("explain select ?", Array(1)).show();`
    fail with
    `org.apache.spark.sql.catalyst.ExtendedAnalysisException: 
[UNBOUND_SQL_PARAMETER] Found the unbound parameter: _16. Please, fix `args` 
and provide a mapping of the parameter to either a SQL literal or collection 
constructor functions such as `map()`, `array()`, `struct()`. SQLSTATE: 42P02; 
line 1 pos 16; 'Project [unresolvedalias(posparameter(16))] +- OneRowRelation`
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, parameterized `EXPLAIN` and `DESCRIBE` should start working for users
    
    ### How was this patch tested?
    - Run `sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala`
    - Run `sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala`
    - New tests for `SQLQuerySuite`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #46209 from 
vladimirg-db/vladimirg-db/make-explain-and-describe-work-with-parameters.
    
    Authored-by: Vladimir Golubev <vladimir.golu...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   1 +
 .../spark/sql/catalyst/analysis/parameters.scala   |  64 ++++++++--
 .../spark/sql/catalyst/plans/logical/Command.scala |  11 ++
 .../spark/sql/execution/command/commands.scala     |   7 +-
 .../spark/sql/execution/command/tables.scala       |   5 +-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 141 +++++++++++++++++++++
 6 files changed, 218 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 4b753e1f28e5..c29432c916f9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -325,6 +325,7 @@ class Analyzer(override val catalogManager: CatalogManager) 
extends RuleExecutor
       RewriteDeleteFromTable ::
       RewriteUpdateTable ::
       RewriteMergeIntoTable ::
+      MoveParameterizedQueriesDown ::
       BindParameters ::
       typeCoercionRules() ++
       Seq(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
index f1cc44b270bc..5b365a0d49ae 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions.{Alias, CreateArray, 
CreateMap, CreateNamedStruct, Expression, LeafExpression, Literal, 
MapFromArrays, MapFromEntries, SubqueryExpression, Unevaluable, 
VariableReference}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
SupervisingCommand}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{PARAMETER, 
PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMAND, PARAMETER, 
PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH}
 import org.apache.spark.sql.errors.QueryErrorsBase
 import org.apache.spark.sql.types.DataType
 
@@ -104,12 +104,64 @@ case class PosParameterizedQuery(child: LogicalPlan, 
args: Seq[Expression])
     copy(child = newChild)
 }
 
+/**
+ * Base class for rules that process parameterized queries.
+ */
+abstract class ParameterizedQueryProcessor extends Rule[LogicalPlan] {
+  def assertUnresolvedPlanHasSingleParameterizedQuery(plan: LogicalPlan): Unit 
= {
+    if (plan.containsPattern(PARAMETERIZED_QUERY)) {
+      val parameterizedQueries = plan.collect { case p: ParameterizedQuery => 
p }
+      assert(parameterizedQueries.length == 1)
+    }
+  }
+}
+
+/**
+ * Moves `ParameterizedQuery` inside `SupervisingCommand` for their supervised 
plans to be
+ * resolved later by the analyzer.
+ *
+ * - Basic case:
+ * `PosParameterizedQuery(ExplainCommand(SomeQuery(...)))` =>
+ * `ExplainCommand(PosParameterizedQuery(SomeQuery(...)))`
+ * - Nested `SupervisedCommand`s are handled recursively:
+ * `PosParameterizedQuery(ExplainCommand(ExplainCommand(SomeQuery(...))))` =>
+ * `ExplainCommand(ExplainCommand(PosParameterizedQuery(SomeQuery(...))))`
+ */
+object MoveParameterizedQueriesDown extends ParameterizedQueryProcessor {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    assertUnresolvedPlanHasSingleParameterizedQuery(plan)
+
+    plan.resolveOperatorsWithPruning(_.containsPattern(PARAMETERIZED_QUERY)) {
+      case pq: ParameterizedQuery if pq.exists(isSupervisingCommand) =>
+        moveParameterizedQueryIntoSupervisingCommand(pq)
+    }
+  }
+
+  private def moveParameterizedQueryIntoSupervisingCommand(pq: 
ParameterizedQuery): LogicalPlan = {
+    // Moves parameterized query down recursively to handle nested 
`SupervisingCommand`s
+    def transformSupervisedPlan: PartialFunction[LogicalPlan, LogicalPlan] = {
+      case command: SupervisingCommand =>
+        command.withTransformedSupervisedPlan {
+          transformSupervisedPlan(_)
+        }
+      case plan => pq.withNewChildren(Seq(plan))
+    }
+
+    pq.child.resolveOperatorsWithPruning(_.containsPattern(COMMAND)) {
+      case command: SupervisingCommand => transformSupervisedPlan(command)
+    }
+  }
+
+  private def isSupervisingCommand(plan: LogicalPlan): Boolean =
+    plan.containsPattern(COMMAND) && plan.isInstanceOf[SupervisingCommand]
+}
+
 /**
  * Finds all named parameters in `ParameterizedQuery` and substitutes them by 
literals or
  * by collection constructor functions such as `map()`, `array()`, `struct()`
  * from the user-specified arguments.
  */
-object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase {
+object BindParameters extends ParameterizedQueryProcessor with QueryErrorsBase 
{
   private def checkArgs(args: Iterable[(String, Expression)]): Unit = {
     def isNotAllowed(expr: Expression): Boolean = expr.exists {
       case _: Literal | _: CreateArray | _: CreateNamedStruct |
@@ -131,11 +183,7 @@ object BindParameters extends Rule[LogicalPlan] with 
QueryErrorsBase {
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    if (plan.containsPattern(PARAMETERIZED_QUERY)) {
-      // One unresolved plan can have at most one ParameterizedQuery.
-      val parameterizedQueries = plan.collect { case p: ParameterizedQuery => 
p }
-      assert(parameterizedQueries.length == 1)
-    }
+    assertUnresolvedPlanHasSingleParameterizedQuery(plan)
 
     plan.resolveOperatorsWithPruning(_.containsPattern(PARAMETERIZED_QUERY)) {
       // We should wait for `CTESubstitution` to resolve CTE before binding 
parameters, as CTE
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
index fc9eb5d03e49..bd277e92d11d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
@@ -63,3 +63,14 @@ trait AnalysisOnlyCommand extends Command {
   // on the `AnalysisContext`
   def markAsAnalyzed(analysisContext: AnalysisContext): LogicalPlan
 }
+
+/**
+ * A logical node that does not expose its sub-nodes as children, but rather 
supervises them
+ * in an implementation-defined manner.
+ */
+trait SupervisingCommand extends LeafCommand {
+  /**
+   * Transforms its supervised plan using `transformer` and returns a copy of 
`SupervisingCommand`
+   */
+  def withTransformedSupervisedPlan(transformer: LogicalPlan => LogicalPlan): 
LogicalPlan
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index a1e9c4229b19..eec79ad02e29 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, 
SupervisingCommand}
 import org.apache.spark.sql.catalyst.trees.LeafLike
 import org.apache.spark.sql.connector.ExternalCommandRunner
 import org.apache.spark.sql.execution.{CommandExecutionMode, ExplainMode, 
LeafExecNode, SparkPlan, UnaryExecNode}
@@ -157,7 +157,7 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, 
child: SparkPlan)
 case class ExplainCommand(
     logicalPlan: LogicalPlan,
     mode: ExplainMode)
-  extends LeafRunnableCommand {
+  extends RunnableCommand with SupervisingCommand {
 
   override val output: Seq[Attribute] =
     Seq(AttributeReference("plan", StringType, nullable = true)())
@@ -171,6 +171,9 @@ case class ExplainCommand(
     ("Error occurred during query planning: \n" + cause.getMessage).split("\n")
       .map(Row(_)).toImmutableArraySeq
   }
+
+  def withTransformedSupervisedPlan(transformer: LogicalPlan => LogicalPlan): 
LogicalPlan =
+    copy(logicalPlan = transformer(logicalPlan))
 }
 
 /** An explain command for users to see how a streaming batch is executed. */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 1a97b965da2b..990b7da339a2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -760,7 +760,7 @@ case class DescribeTableCommand(
  * 7. Common table expressions (CTEs)
  */
 case class DescribeQueryCommand(queryText: String, plan: LogicalPlan)
-  extends DescribeCommandBase with CTEInChildren {
+  extends DescribeCommandBase with SupervisingCommand with CTEInChildren {
 
   override val output = DescribeCommandSchema.describeTableAttributes()
 
@@ -776,6 +776,9 @@ case class DescribeQueryCommand(queryText: String, plan: 
LogicalPlan)
   override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
     copy(plan = WithCTE(plan, cteDefs))
   }
+
+  def withTransformedSupervisedPlan(transformer: LogicalPlan => LogicalPlan): 
LogicalPlan =
+    copy(plan = transformer(plan))
 }
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 78d4b91088a6..470f8ff4cd85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -4715,6 +4715,147 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
     val df6 = df3.join(df2, col("df3.zaak_id") === col("df2.customer_id"), 
"outer")
     df5.crossJoin(df6)
   }
+
+  test("SPARK-47939: Describe should work with parameterized queries") {
+    checkAnswer(
+      spark.sql("describe select ?", Array(1)),
+      Array(
+        Row("1", "int", null)
+      )
+    )
+    checkAnswer(
+      spark.sql("describe select :first", Map("first" -> 1)),
+      Array(
+        Row("1", "int", null)
+      )
+    )
+
+    checkAnswer(
+      spark.sql("describe select * from values (?, ?) t(x, y)", Array(1, "a")),
+      Array(
+        Row("x", "int", null),
+        Row("y", "string", null)
+      )
+    )
+    checkAnswer(
+      spark.sql(
+        "describe select * from values (:first, :second) t(x, y)",
+        Map("first" -> 1, "second" -> "a")
+      ),
+      Array(
+        Row("x", "int", null),
+        Row("y", "string", null)
+      )
+    )
+  }
+
+  test("SPARK-47939: Explain should work with parameterized queries") {
+    def checkQueryPlan(df: DataFrame, plan: String): Unit = assert(
+      df.collect()
+        .map(_.getString(0))
+        .map(_.replaceAll("#[0-9]+", "#N"))
+        .sameElements(Array(plan.stripMargin))
+    )
+
+    checkQueryPlan(
+      spark.sql("explain select ?", Array(1)),
+      """== Physical Plan ==
+        |*(1) Project [1 AS 1#N]
+        |+- *(1) Scan OneRowRelation[]
+
+        |"""
+    )
+    checkQueryPlan(
+      spark.sql("explain select :first", Map("first" -> 1)),
+      """== Physical Plan ==
+        |*(1) Project [1 AS 1#N]
+        |+- *(1) Scan OneRowRelation[]
+
+        |"""
+    )
+
+    checkQueryPlan(
+      spark.sql("explain explain explain select ?", Array(1)),
+      """== Physical Plan ==
+        |Execute ExplainCommand
+        |   +- ExplainCommand ExplainCommand 'PosParameterizedQuery [1], 
SimpleMode, SimpleMode
+
+        |"""
+    )
+    checkQueryPlan(
+      spark.sql("explain explain explain select :first", Map("first" -> 1)),
+      // scalastyle:off
+      """== Physical Plan ==
+        |Execute ExplainCommand
+        |   +- ExplainCommand ExplainCommand 'NameParameterizedQuery [first], 
[1], SimpleMode, SimpleMode
+
+        |"""
+      // scalastyle:on
+    )
+
+    checkQueryPlan(
+      spark.sql("explain describe select ?", Array(1)),
+      """== Physical Plan ==
+        |Execute DescribeQueryCommand
+        |   +- DescribeQueryCommand select ?
+
+        |"""
+    )
+    checkQueryPlan(
+      spark.sql("explain describe select :first", Map("first" -> 1)),
+      """== Physical Plan ==
+        |Execute DescribeQueryCommand
+        |   +- DescribeQueryCommand select :first
+
+        |"""
+    )
+
+    checkQueryPlan(
+      spark.sql("explain extended select * from values (?, ?) t(x, y)", 
Array(1, "a")),
+      """== Parsed Logical Plan ==
+        |'PosParameterizedQuery [1, a]
+        |+- 'Project [*]
+        |   +- 'SubqueryAlias t
+        |      +- 'UnresolvedInlineTable [x, y], [[posparameter(39), 
posparameter(42)]]
+
+        |== Analyzed Logical Plan ==
+        |x: int, y: string
+        |Project [x#N, y#N]
+        |+- SubqueryAlias t
+        |   +- LocalRelation [x#N, y#N]
+
+        |== Optimized Logical Plan ==
+        |LocalRelation [x#N, y#N]
+
+        |== Physical Plan ==
+        |LocalTableScan [x#N, y#N]
+        |"""
+    )
+    checkQueryPlan(
+      spark.sql(
+        "explain extended select * from values (:first, :second) t(x, y)",
+        Map("first" -> 1, "second" -> "a")
+      ),
+      """== Parsed Logical Plan ==
+        |'NameParameterizedQuery [first, second], [1, a]
+        |+- 'Project [*]
+        |   +- 'SubqueryAlias t
+        |      +- 'UnresolvedInlineTable [x, y], [[namedparameter(first), 
namedparameter(second)]]
+
+        |== Analyzed Logical Plan ==
+        |x: int, y: string
+        |Project [x#N, y#N]
+        |+- SubqueryAlias t
+        |   +- LocalRelation [x#N, y#N]
+
+        |== Optimized Logical Plan ==
+        |LocalRelation [x#N, y#N]
+
+        |== Physical Plan ==
+        |LocalTableScan [x#N, y#N]
+        |"""
+    )
+  }
 }
 
 case class Foo(bar: Option[String])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to