This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 05f7aa596c7b [SPARK-46794][SQL] Remove subqueries from LogicalRDD constraints 05f7aa596c7b is described below commit 05f7aa596c7b1c05704abfad94b1b1d3085c530e Author: Tom van Bussel <tom.vanbus...@databricks.com> AuthorDate: Tue Jan 23 08:45:32 2024 -0800 [SPARK-46794][SQL] Remove subqueries from LogicalRDD constraints This PR modifies `LogicalRDD` to filter out all subqueries from its `constraints`. Fixes a correctness bug. Spark can produce incorrect results when using a checkpointed `DataFrame` with a filter containing a scalar subquery. This subquery is included in the constraints of the resulting `LogicalRDD`, and may then be propagated as a filter when joining with the checkpointed `DataFrame`. This causes the subquery to be evaluated twice: once during checkpointing and once while evaluating the query. These two subquery evaluations may return different results, e.g. when t [...] No Added a test to `DataFrameSuite`. No Closes #44833 from tomvanbussel/SPARK-46794. Authored-by: Tom van Bussel <tom.vanbus...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit d26e871136e0c6e1f84a25978319733a516b7b2e) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/sql/execution/ExistingRDD.scala | 7 +++++++ .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 16 +++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 3dcf0efaadd8..3b49abcb1a86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -150,6 +150,13 @@ case class LogicalRDD( } override lazy val constraints: ExpressionSet = originConstraints.getOrElse(ExpressionSet()) + // Subqueries can have non-deterministic results even when they only contain deterministic + // expressions (e.g. consider a LIMIT 1 subquery without an ORDER BY). Propagating predicates + // containing a subquery causes the subquery to be executed twice (as the result of the subquery + // in the checkpoint computation cannot be reused), which could result in incorrect results. + // Therefore we assume that all subqueries are non-deterministic, and we do not expose any + // constraints that contain a subquery. + .filterNot(SubqueryExpression.hasSubquery) } object LogicalRDD extends Logging { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 2eba9f181098..002719f06896 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, EqualTo, ExpressionSet, GreaterThan, Literal, PythonUDF, Uuid} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, EqualTo, ExpressionSet, GreaterThan, Literal, PythonUDF, ScalarSubquery, Uuid} import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LeafNode, LocalRelation, LogicalPlan, OneRowRelation, Statistics} @@ -2258,6 +2258,20 @@ class DataFrameSuite extends QueryTest assert(newConstraints === newExpectedConstraints) } + test("SPARK-46794: exclude subqueries from LogicalRDD constraints") { + withTempDir { checkpointDir => + val subquery = + new Column(ScalarSubquery(spark.range(10).selectExpr("max(id)").logicalPlan)) + val df = spark.range(1000).filter($"id" === subquery) + assert(df.logicalPlan.constraints.exists(_.exists(_.isInstanceOf[ScalarSubquery]))) + + spark.sparkContext.setCheckpointDir(checkpointDir.getAbsolutePath) + val checkpointedDf = df.checkpoint() + assert(!checkpointedDf.logicalPlan.constraints + .exists(_.exists(_.isInstanceOf[ScalarSubquery]))) + } + } + test("SPARK-10656: completely support special chars") { val df = Seq(1 -> "a").toDF("i_$.a", "d^'a.") checkAnswer(df.select(df("*")), Row(1, "a")) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org