This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 05f7aa596c7b [SPARK-46794][SQL] Remove subqueries from LogicalRDD 
constraints
05f7aa596c7b is described below

commit 05f7aa596c7b1c05704abfad94b1b1d3085c530e
Author: Tom van Bussel <tom.vanbus...@databricks.com>
AuthorDate: Tue Jan 23 08:45:32 2024 -0800

    [SPARK-46794][SQL] Remove subqueries from LogicalRDD constraints
    
    This PR modifies `LogicalRDD` to filter out all subqueries from its 
`constraints`.
    
    Fixes a correctness bug. Spark can produce incorrect results when using a 
checkpointed `DataFrame` with a filter containing a scalar subquery. This 
subquery is included in the constraints of the resulting `LogicalRDD`, and may 
then be propagated as a filter when joining with the checkpointed `DataFrame`. 
This causes the subquery to be evaluated twice: once during checkpointing and 
once while evaluating the query. These two subquery evaluations may return 
different results, e.g. when t [...]
    
    No
    
    Added a test to `DataFrameSuite`.
    
    No
    
    Closes #44833 from tomvanbussel/SPARK-46794.
    
    Authored-by: Tom van Bussel <tom.vanbus...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit d26e871136e0c6e1f84a25978319733a516b7b2e)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/sql/execution/ExistingRDD.scala     |  7 +++++++
 .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 16 +++++++++++++++-
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 3dcf0efaadd8..3b49abcb1a86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -150,6 +150,13 @@ case class LogicalRDD(
   }
 
   override lazy val constraints: ExpressionSet = 
originConstraints.getOrElse(ExpressionSet())
+    // Subqueries can have non-deterministic results even when they only 
contain deterministic
+    // expressions (e.g. consider a LIMIT 1 subquery without an ORDER BY). 
Propagating predicates
+    // containing a subquery causes the subquery to be executed twice (as the 
result of the subquery
+    // in the checkpoint computation cannot be reused), which could result in 
incorrect results.
+    // Therefore we assume that all subqueries are non-deterministic, and we 
do not expose any
+    // constraints that contain a subquery.
+    .filterNot(SubqueryExpression.hasSubquery)
 }
 
 object LogicalRDD extends Logging {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 2eba9f181098..002719f06896 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.scheduler.{SparkListener, 
SparkListenerJobEnd}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, EqualTo, ExpressionSet, GreaterThan, Literal, PythonUDF, 
Uuid}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, EqualTo, ExpressionSet, GreaterThan, Literal, PythonUDF, 
ScalarSubquery, Uuid}
 import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LeafNode, 
LocalRelation, LogicalPlan, OneRowRelation, Statistics}
@@ -2258,6 +2258,20 @@ class DataFrameSuite extends QueryTest
     assert(newConstraints === newExpectedConstraints)
   }
 
+  test("SPARK-46794: exclude subqueries from LogicalRDD constraints") {
+    withTempDir { checkpointDir =>
+      val subquery =
+        new 
Column(ScalarSubquery(spark.range(10).selectExpr("max(id)").logicalPlan))
+      val df = spark.range(1000).filter($"id" === subquery)
+      
assert(df.logicalPlan.constraints.exists(_.exists(_.isInstanceOf[ScalarSubquery])))
+
+      spark.sparkContext.setCheckpointDir(checkpointDir.getAbsolutePath)
+      val checkpointedDf = df.checkpoint()
+      assert(!checkpointedDf.logicalPlan.constraints
+        .exists(_.exists(_.isInstanceOf[ScalarSubquery])))
+    }
+  }
+
   test("SPARK-10656: completely support special chars") {
     val df = Seq(1 -> "a").toDF("i_$.a", "d^'a.")
     checkAnswer(df.select(df("*")), Row(1, "a"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to