This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 70ff959  [SPARK-31721][SQL] Assert optimized is initialized before 
tracking the planning time
70ff959 is described below

commit 70ff959c513aad212c2852ab6d8a8176b3e44a6d
Author: Ali Afroozeh <[email protected]>
AuthorDate: Tue May 19 11:10:49 2020 +0200

    [SPARK-31721][SQL] Assert optimized is initialized before tracking the 
planning time
    
    ### What changes were proposed in this pull request?
    The QueryPlanningTracker in QueryExeuction reports the planning time that 
also includes the optimization time. This happens because the optimizedPlan in 
QueryExecution is lazy and only will initialize when first called. When 
df.queryExecution.executedPlan is called, the the tracker starts recording the 
planning time, and then calls the optimized plan. This causes the planning time 
to start before optimization and also include the planning time.
    This PR fixes this behavior by introducing a method assertOptimized, 
similar to assertAnalyzed that explicitly initializes the optimized plan. This 
method is called before measuring the time for sparkPlan and executedPlan. We 
call it before sparkPlan because that also counts as planning time.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit tests
    
    Closes #28543 from dbaliafroozeh/AddAssertOptimized.
    
    Authored-by: Ali Afroozeh <[email protected]>
    Signed-off-by: herman <[email protected]>
    (cherry picked from commit b9cc31cd958fec4b473e757694c5f811316c858e)
    Signed-off-by: herman <[email protected]>
---
 .../spark/sql/execution/QueryExecution.scala       | 27 ++++++++++++++++------
 .../QueryPlanningTrackerEndToEndSuite.scala        |  9 ++++++++
 .../apache/spark/sql/execution/SparkPlanTest.scala |  3 ++-
 3 files changed, 31 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 99bc45f..1df812d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -82,17 +82,30 @@ class QueryExecution(
     
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), 
tracker)
   }
 
-  lazy val sparkPlan: SparkPlan = executePhase(QueryPlanningTracker.PLANNING) {
-    // Clone the logical plan here, in case the planner rules change the 
states of the logical plan.
-    QueryExecution.createSparkPlan(sparkSession, planner, 
optimizedPlan.clone())
+  private def assertOptimized(): Unit = optimizedPlan
+
+  lazy val sparkPlan: SparkPlan = {
+    // We need to materialize the optimizedPlan here because sparkPlan is also 
tracked under
+    // the planning phase
+    assertOptimized()
+    executePhase(QueryPlanningTracker.PLANNING) {
+      // Clone the logical plan here, in case the planner rules change the 
states of the logical
+      // plan.
+      QueryExecution.createSparkPlan(sparkSession, planner, 
optimizedPlan.clone())
+    }
   }
 
   // executedPlan should not be used to initialize any SparkPlan. It should be
   // only used for execution.
-  lazy val executedPlan: SparkPlan = 
executePhase(QueryPlanningTracker.PLANNING) {
-    // clone the plan to avoid sharing the plan instance between different 
stages like analyzing,
-    // optimizing and planning.
-    QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
+  lazy val executedPlan: SparkPlan = {
+    // We need to materialize the optimizedPlan here, before tracking the 
planning phase, to ensure
+    // that the optimization time is not counted as part of the planning phase.
+    assertOptimized()
+    executePhase(QueryPlanningTracker.PLANNING) {
+      // clone the plan to avoid sharing the plan instance between different 
stages like analyzing,
+      // optimizing and planning.
+      QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
+    }
   }
 
   /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
index 987338c..5ff4595 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala
@@ -58,4 +58,13 @@ class QueryPlanningTrackerEndToEndSuite extends StreamTest {
       StopStream)
   }
 
+  test("The start times should be in order: parsing <= analysis <= 
optimization <= planning") {
+    val df = spark.sql("select count(*) from range(1)")
+    df.queryExecution.executedPlan
+    val phases = df.queryExecution.tracker.phases
+    assert(phases("parsing").startTimeMs <= phases("analysis").startTimeMs)
+    assert(phases("analysis").startTimeMs <= 
phases("optimization").startTimeMs)
+    assert(phases("optimization").startTimeMs <= 
phases("planning").startTimeMs)
+  }
+
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
index b29e822..7ddf9d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
@@ -23,6 +23,7 @@ import scala.util.control.NonFatal
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.test.SQLTestUtils
 
 /**
@@ -237,7 +238,7 @@ object SparkPlanTest {
    * @param spark SqlContext used for execution of the plan
    */
   def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = {
-    val execution = new QueryExecution(spark.sparkSession, null) {
+    val execution = new QueryExecution(spark.sparkSession, LocalRelation(Nil)) 
{
       override lazy val sparkPlan: SparkPlan = outputPlan transform {
         case plan: SparkPlan =>
           val inputMap = plan.children.flatMap(_.output).map(a => (a.name, 
a)).toMap


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to