This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a7fa2700e0f0 [SPARK-48196][SQL] Turn QueryExecution lazy val plans 
into LazyTry
a7fa2700e0f0 is described below

commit a7fa2700e0f0f70ec6306f48a5bd137225029b80
Author: Julek Sompolski <Juliusz Sompolski>
AuthorDate: Mon Sep 30 23:39:50 2024 +0800

    [SPARK-48196][SQL] Turn QueryExecution lazy val plans into LazyTry
    
    ### What changes were proposed in this pull request?
    
    Currently, when evaluation of `lazy val` of some of the plans fails in 
QueryExecution, this `lazy val` remains not initialized, and another attempt 
will be made to initialize it the next time it's referenced. This leads to 
planning being performed multiple times, resulting in inefficiencies, and 
potential duplication of side effects, for example from ConvertToLocalRelation 
that can pull in UDFs with side effects.
    
    ### Why are the changes needed?
    
    Current behaviour leads to inefficiencies and subtle problems in accidental 
situations, for example when plans are accessed for logging purposes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    This change would bring slight behaviour changes:
    
    Examples:
    ```
    val df = a.join(b)
    spark.conf.set(“spark.sql.crossJoin.enabled”, “false”)
    try { df.collect() } catch { case _ => }
    spark.conf.set(“spark.sql.crossJoin.enabled”, “true”)
    df.collect()
    ```
    This used to succeed, because the first time around the plan will not be 
initialized because it threw an error because of the cartprod, and the second 
time around it will try to initialize it again and pick up the new config. This 
will now fail, because the second execution will retrieve the error from the 
first time around instead of retrying.
    
    The old semantics is if plan evaluation fails, try again next time it's 
accessed and if plan evaluation ever succeeded, keep that plan. The new 
semantics is that if plan evaluation fails, it keeps that error and rethrows it 
next time the plan is accessed. A new QueryExecution object / new Dataset is 
needed to reset it.
    
    Spark 4.0 may be a good candidate for a slight change in this, to make sure 
that we don't re-execute the optimizer, and potential side effects of it.
    
    Note: These behaviour changes have already happened in Spark Connect mode, 
where the Dataset object is not reused across execution. This change makes 
Spark Classic and Spark Connect behave the same again.
    
    ### How was this patch tested?
    
    Existing tests shows no issues, except for the tests that exhibit the 
behaviour change described above.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Trivial code completion suggestions.
    Generated-by: Github Copilot
    
    Closes #48211 from juliuszsompolski/SPARK-48196-lazyplans.
    
    Authored-by: Julek Sompolski <Juliusz Sompolski>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 python/pyspark/sql/tests/test_udf.py               |  3 +-
 .../spark/sql/execution/QueryExecution.scala       | 63 ++++++++++++++--------
 2 files changed, 44 insertions(+), 22 deletions(-)

diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index 6f672b0ae5fb..879329bd80c0 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -237,11 +237,12 @@ class BaseUDFTestsMixin(object):
         f = udf(lambda a, b: a == b, BooleanType())
         # The udf uses attributes from both sides of join, so it is pulled out 
as Filter +
         # Cross join.
-        df = left.join(right, f("a", "b"))
         with self.sql_conf({"spark.sql.crossJoin.enabled": False}):
+            df = left.join(right, f("a", "b"))
             with self.assertRaisesRegex(AnalysisException, "Detected implicit 
cartesian product"):
                 df.collect()
         with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+            df = left.join(right, f("a", "b"))
             self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
     def test_udf_in_left_outer_join_condition(self):
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 5c894eb7555b..6ff2c5d4b9d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -46,8 +46,8 @@ import 
org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata, WatermarkPropagator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.util.{LazyTry, Utils}
 import org.apache.spark.util.ArrayImplicits._
-import org.apache.spark.util.Utils
 
 /**
  * The primary workflow for executing relational queries using Spark.  
Designed to allow easy
@@ -86,7 +86,7 @@ class QueryExecution(
     }
   }
 
-  lazy val analyzed: LogicalPlan = {
+  private val lazyAnalyzed = LazyTry {
     val plan = executePhase(QueryPlanningTracker.ANALYSIS) {
       // We can't clone `logical` here, which will reset the `_analyzed` flag.
       sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
@@ -95,12 +95,18 @@ class QueryExecution(
     plan
   }
 
-  lazy val commandExecuted: LogicalPlan = mode match {
-    case CommandExecutionMode.NON_ROOT => 
analyzed.mapChildren(eagerlyExecuteCommands)
-    case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
-    case CommandExecutionMode.SKIP => analyzed
+  def analyzed: LogicalPlan = lazyAnalyzed.get
+
+  private val lazyCommandExecuted = LazyTry {
+    mode match {
+      case CommandExecutionMode.NON_ROOT => 
analyzed.mapChildren(eagerlyExecuteCommands)
+      case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
+      case CommandExecutionMode.SKIP => analyzed
+    }
   }
 
+  def commandExecuted: LogicalPlan = lazyCommandExecuted.get
+
   private def commandExecutionName(command: Command): String = command match {
     case _: CreateTableAsSelect => "create"
     case _: ReplaceTableAsSelect => "replace"
@@ -141,22 +147,28 @@ class QueryExecution(
     }
   }
 
-  // The plan that has been normalized by custom rules, so that it's more 
likely to hit cache.
-  lazy val normalized: LogicalPlan = {
+  private val lazyNormalized = LazyTry {
     QueryExecution.normalize(sparkSession, commandExecuted, Some(tracker))
   }
 
-  lazy val withCachedData: LogicalPlan = sparkSession.withActive {
-    assertAnalyzed()
-    assertSupported()
-    // clone the plan to avoid sharing the plan instance between different 
stages like analyzing,
-    // optimizing and planning.
-    sparkSession.sharedState.cacheManager.useCachedData(normalized.clone())
+  // The plan that has been normalized by custom rules, so that it's more 
likely to hit cache.
+  def normalized: LogicalPlan = lazyNormalized.get
+
+  private val lazyWithCachedData = LazyTry {
+    sparkSession.withActive {
+      assertAnalyzed()
+      assertSupported()
+      // clone the plan to avoid sharing the plan instance between different 
stages like analyzing,
+      // optimizing and planning.
+      sparkSession.sharedState.cacheManager.useCachedData(normalized.clone())
+    }
   }
 
+  def withCachedData: LogicalPlan = lazyWithCachedData.get
+
   def assertCommandExecuted(): Unit = commandExecuted
 
-  lazy val optimizedPlan: LogicalPlan = {
+  private val lazyOptimizedPlan = LazyTry {
     // We need to materialize the commandExecuted here because optimizedPlan 
is also tracked under
     // the optimizing phase
     assertCommandExecuted()
@@ -174,9 +186,11 @@ class QueryExecution(
     }
   }
 
+  def optimizedPlan: LogicalPlan = lazyOptimizedPlan.get
+
   def assertOptimized(): Unit = optimizedPlan
 
-  lazy val sparkPlan: SparkPlan = {
+  private val lazySparkPlan = LazyTry {
     // We need to materialize the optimizedPlan here because sparkPlan is also 
tracked under
     // the planning phase
     assertOptimized()
@@ -187,11 +201,11 @@ class QueryExecution(
     }
   }
 
+  def sparkPlan: SparkPlan = lazySparkPlan.get
+
   def assertSparkPlanPrepared(): Unit = sparkPlan
 
-  // executedPlan should not be used to initialize any SparkPlan. It should be
-  // only used for execution.
-  lazy val executedPlan: SparkPlan = {
+  private val lazyExecutedPlan = LazyTry {
     // We need to materialize the optimizedPlan here, before tracking the 
planning phase, to ensure
     // that the optimization time is not counted as part of the planning phase.
     assertOptimized()
@@ -206,8 +220,16 @@ class QueryExecution(
     plan
   }
 
+  // executedPlan should not be used to initialize any SparkPlan. It should be
+  // only used for execution.
+  def executedPlan: SparkPlan = lazyExecutedPlan.get
+
   def assertExecutedPlanPrepared(): Unit = executedPlan
 
+  val lazyToRdd = LazyTry {
+    new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf)
+  }
+
   /**
    * Internal version of the RDD. Avoids copies and has no schema.
    * Note for callers: Spark may apply various optimization including reusing 
object: this means
@@ -218,8 +240,7 @@ class QueryExecution(
    * Given QueryExecution is not a public class, end users are discouraged to 
use this: please
    * use `Dataset.rdd` instead where conversion will be applied.
    */
-  lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(
-    executedPlan.execute(), sparkSession.sessionState.conf)
+  def toRdd: RDD[InternalRow] = lazyToRdd.get
 
   /** Get the metrics observed during the execution of the query plan. */
   def observedMetrics: Map[String, Row] = 
CollectMetricsExec.collect(executedPlan)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to