This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 93d5816b3f1 [SPARK-42745][SQL] Improved AliasAwareOutputExpression works with DSv2 93d5816b3f1 is described below commit 93d5816b3f1460b405c9828ed5ae646adfa236aa Author: Peter Toth <peter.t...@gmail.com> AuthorDate: Fri Mar 10 20:58:38 2023 +0800 [SPARK-42745][SQL] Improved AliasAwareOutputExpression works with DSv2 ### What changes were proposed in this pull request? After https://github.com/apache/spark/pull/37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query: ``` select (select sum(id) from t1) ``` fails with: ``` 09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.NullPointerException at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60) at scala.runtime.Statics.anyHash(Statics.java:122) ... at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249) at scala.runtime.Statics.anyHash(Statics.java:122) at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416) at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416) at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44) at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149) at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148) at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44) at scala.collection.mutable.HashTable.init(HashTable.scala:110) at scala.collection.mutable.HashTable.init$(HashTable.scala:89) at scala.collection.mutable.HashMap.init(HashMap.scala:44) at scala.collection.mutable.HashMap.readObject(HashMap.scala:195) ... at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` when DSv2 is enabled. This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` as those shouldn't throw NPE in any circumstances. But if we dig deeper we realize that the NPE orrurs since https://github.com/apache/spark/pull/37525 and the root cause of the problem is changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The mutable map deserialization invokes the `hashCode()` of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the `BatchScanExec`. Please note that the mutability of `aliasMap` shouldn't be an issue as it is a `private` field of `AliasAwareOutputExpression` (though adding a simple `.toMap` would also help to avoid the NPE). Based on the above findings this PR also proposes making `aliasMap` to transient as it isn't needed on executors. A side quiestion is if adding any subqery expressions to `AliasAwareOutputExpression.aliasMap` makes any sense because `AliasAwareOutputExpression.projectExpression()` mainly projects `child.outputPartitioning` and `child.outputOrdering` that can't contain subquery expressions. But there are a few exceptions (`SortAggregateExec`, `TakeOrderedAndProjectExec`) where `AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the `child` and actually leaving those expressions i [...] ### Why are the changes needed? To fix regression introduced with https://github.com/apache/spark/pull/37525. ### Does this PR introduce _any_ user-facing change? Yes, the query works again. ### How was this patch tested? Added new UT. Closes #40364 from peter-toth/SPARK-42745-improved-aliasawareoutputexpression-with-dsv2. Authored-by: Peter Toth <peter.t...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/plans/AliasAwareOutputExpression.scala | 1 + .../sql/execution/datasources/v2/BatchScanExec.scala | 5 +++-- .../test/scala/org/apache/spark/sql/SubquerySuite.scala | 16 ++++++++++++++++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala index cfe22994592..2cca7b844cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala @@ -40,6 +40,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper { // more than `aliasCandidateLimit` attributes for an expression. In those cases the old logic // handled only the last alias so we need to make sure that we give precedence to that. // If the `outputExpressions` contain simple attributes we need to add those too to the map. + @transient private lazy val aliasMap = { val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]() outputExpressions.reverse.foreach { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index 52f15cf7b65..d43331d57c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -44,12 +44,13 @@ case class BatchScanExec( applyPartialClustering: Boolean = false, replicatePartitions: Boolean = false) extends DataSourceV2ScanExecBase { - @transient lazy val batch = scan.toBatch + @transient lazy val batch = if (scan == null) null else scan.toBatch // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { case other: BatchScanExec => - this.batch == other.batch && this.runtimeFilters == other.runtimeFilters && + this.batch != null && this.batch == other.batch && + this.runtimeFilters == other.runtimeFilters && this.commonPartitionValues == other.commonPartitionValues && this.replicatePartitions == other.replicatePartitions && this.applyPartialClustering == other.applyPartialClustering diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 4d76013d659..7c15a05c586 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2679,4 +2679,20 @@ class SubquerySuite extends QueryTest Row(8, 6)) } } + + test("SPARK-42745: Improved AliasAwareOutputExpression works with DSv2") { + withSQLConf( + SQLConf.USE_V1_SOURCE_LIST.key -> "") { + withTempPath { path => + spark.range(0) + .write + .mode("overwrite") + .parquet(path.getCanonicalPath) + withTempView("t1") { + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkAnswer(sql("select (select sum(id) from t1)"), Row(null)) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org