This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 05f7aa596c7b [SPARK-46794][SQL] Remove subqueries from LogicalRDD
constraints
05f7aa596c7b is described below
commit 05f7aa596c7b1c05704abfad94b1b1d3085c530e
Author: Tom van Bussel <[email protected]>
AuthorDate: Tue Jan 23 08:45:32 2024 -0800
[SPARK-46794][SQL] Remove subqueries from LogicalRDD constraints
This PR modifies `LogicalRDD` to filter out all subqueries from its
`constraints`.
Fixes a correctness bug. Spark can produce incorrect results when using a
checkpointed `DataFrame` with a filter containing a scalar subquery. This
subquery is included in the constraints of the resulting `LogicalRDD`, and may
then be propagated as a filter when joining with the checkpointed `DataFrame`.
This causes the subquery to be evaluated twice: once during checkpointing and
once while evaluating the query. These two subquery evaluations may return
different results, e.g. when t [...]
No
Added a test to `DataFrameSuite`.
No
Closes #44833 from tomvanbussel/SPARK-46794.
Authored-by: Tom van Bussel <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit d26e871136e0c6e1f84a25978319733a516b7b2e)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/execution/ExistingRDD.scala | 7 +++++++
.../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 16 +++++++++++++++-
2 files changed, 22 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 3dcf0efaadd8..3b49abcb1a86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -150,6 +150,13 @@ case class LogicalRDD(
}
override lazy val constraints: ExpressionSet =
originConstraints.getOrElse(ExpressionSet())
+ // Subqueries can have non-deterministic results even when they only
contain deterministic
+ // expressions (e.g. consider a LIMIT 1 subquery without an ORDER BY).
Propagating predicates
+ // containing a subquery causes the subquery to be executed twice (as the
result of the subquery
+ // in the checkpoint computation cannot be reused), which could result in
incorrect results.
+ // Therefore we assume that all subqueries are non-deterministic, and we
do not expose any
+ // constraints that contain a subquery.
+ .filterNot(SubqueryExpression.hasSubquery)
}
object LogicalRDD extends Logging {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 2eba9f181098..002719f06896 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.scheduler.{SparkListener,
SparkListenerJobEnd}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap,
AttributeReference, EqualTo, ExpressionSet, GreaterThan, Literal, PythonUDF,
Uuid}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap,
AttributeReference, EqualTo, ExpressionSet, GreaterThan, Literal, PythonUDF,
ScalarSubquery, Uuid}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LeafNode,
LocalRelation, LogicalPlan, OneRowRelation, Statistics}
@@ -2258,6 +2258,20 @@ class DataFrameSuite extends QueryTest
assert(newConstraints === newExpectedConstraints)
}
+ test("SPARK-46794: exclude subqueries from LogicalRDD constraints") {
+ withTempDir { checkpointDir =>
+ val subquery =
+ new
Column(ScalarSubquery(spark.range(10).selectExpr("max(id)").logicalPlan))
+ val df = spark.range(1000).filter($"id" === subquery)
+
assert(df.logicalPlan.constraints.exists(_.exists(_.isInstanceOf[ScalarSubquery])))
+
+ spark.sparkContext.setCheckpointDir(checkpointDir.getAbsolutePath)
+ val checkpointedDf = df.checkpoint()
+ assert(!checkpointedDf.logicalPlan.constraints
+ .exists(_.exists(_.isInstanceOf[ScalarSubquery])))
+ }
+ }
+
test("SPARK-10656: completely support special chars") {
val df = Seq(1 -> "a").toDF("i_$.a", "d^'a.")
checkAnswer(df.select(df("*")), Row(1, "a"))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]