This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b4eb1f  [SPARK-37379][SQL] Add tree pattern pruning to 
CTESubstitution rule
3b4eb1f is described below

commit 3b4eb1fbd8a351c29a12bfd94ec4cdbee803f416
Author: Josh Rosen <joshro...@databricks.com>
AuthorDate: Fri Nov 19 15:24:52 2021 -0800

    [SPARK-37379][SQL] Add tree pattern pruning to CTESubstitution rule
    
    ### What changes were proposed in this pull request?
    
    This PR adds tree pattern pruning to the `CTESubstitution` analyzer rule. 
The rule will now exit early if the tree does not contain an `UnresolvedWith` 
node.
    
    ### Why are the changes needed?
    
    Analysis is eagerly performed after every DataFrame transformation. If a 
user's program performs a long chain of _n_ transformations to construct a 
large query plan then this can lead to _O(n^2)_ performance costs from 
`CTESubstitution` because it is applied _n_ times and each application 
traverses the entire logical plan tree (which contains _O(n)_ nodes). In the 
case of chained `withColumn` calls (leading to stacked `Project` nodes) it's 
possible to see _O(n^3)_ slowdowns where _n_  [...]
    
    Very large DataFrame plans typically do not use CTEs because there is not a 
DataFrame syntax for them (although they might appear in the plan if 
`sql(someQueryWithCTE)` is used). As a result, this PR's proposed optimization 
to skip `CTESubstitution` can greatly reduce the analysis cost for such plans.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I believe that optimizer correctness is covered by existing tests.
    
    As a toy benchmark, I ran
    
    ```
    import org.apache.spark.sql.DataFrame
    org.apache.spark.sql.catalyst.rules.RuleExecutor.resetMetrics()
    (1 to 600).foldLeft(spark.range(100).toDF)((df: DataFrame, i: Int) => 
df.withColumn(s"col$i", $"id" % i))
    println(org.apache.spark.sql.catalyst.rules.RuleExecutor.dumpTimeSpent())
    ```
    
    on my laptop before and after this PR's changes (simulating a _O(n^3)_ 
case). Skipping `CTESubstitution` cut the running time from ~28.4 seconds to 
~15.5 seconds.
    
    The bulk of the remaining time comes from `DeduplicateRelations`, for which 
I plan to submit a separate optimization PR.
    
    Closes #34658 from JoshRosen/CTESubstitution-tree-pattern-pruning.
    
    Authored-by: Josh Rosen <joshro...@databricks.com>
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
---
 .../scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala | 3 +++
 .../spark/sql/catalyst/plans/logical/basicLogicalOperators.scala       | 2 ++
 .../main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala  | 1 +
 3 files changed, 6 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index ec3d957..2e2d415 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -48,6 +48,9 @@ import 
org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, Lega
  */
 object CTESubstitution extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = {
+    if (!plan.containsPattern(UNRESOLVED_WITH)) {
+      return plan
+    }
     val isCommand = plan.find {
       case _: Command | _: ParsedStatement | _: InsertIntoDir => true
       case _ => false
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index f1b954d..e8a632d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -626,6 +626,8 @@ object View {
 case class UnresolvedWith(
     child: LogicalPlan,
     cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode {
+  final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_WITH)
+
   override def output: Seq[Attribute] = child.output
 
   override def simpleString(maxFields: Int): String = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index 6c1b64d..aad90ff 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -111,6 +111,7 @@ object TreePattern extends Enumeration  {
   val REPARTITION_OPERATION: Value = Value
   val UNION: Value = Value
   val UNRESOLVED_RELATION: Value = Value
+  val UNRESOLVED_WITH: Value = Value
   val TYPED_FILTER: Value = Value
   val WINDOW: Value = Value
   val WITH_WINDOW_DEFINITION: Value = Value

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to