Repository: spark Updated Branches: refs/heads/master 16df133d7 -> 6c5fd977f
[SPARK-15791] Fix NPE in ScalarSubquery ## What changes were proposed in this pull request? The fix is pretty simple, just don't make the executedPlan transient in `ScalarSubquery` since it is referenced at execution time. ## How was this patch tested? I verified the fix manually in non-local mode. It's not clear to me why the problem did not manifest in local mode, any suggestions? cc davies Author: Eric Liang <[email protected]> Closes #13569 from ericl/fix-scalar-npe. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c5fd977 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c5fd977 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c5fd977 Branch: refs/heads/master Commit: 6c5fd977fbcb821a57cb4a13bc3d413a695fbc32 Parents: 16df133 Author: Eric Liang <[email protected]> Authored: Thu Jun 9 22:28:31 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Thu Jun 9 22:28:31 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/execution/subquery.scala | 2 +- .../src/test/scala/org/apache/spark/sql/QueryTest.scala | 10 ++++++++-- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 3 ++- .../test/scala/org/apache/spark/sql/SubquerySuite.scala | 4 ++++ 4 files changed, 15 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6c5fd977/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 4a1f12d..461d301 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.DataType * This is the physical copy of ScalarSubquery to be used inside SparkPlan. */ case class ScalarSubquery( - @transient executedPlan: SparkPlan, + executedPlan: SparkPlan, exprId: ExprId) extends SubqueryExpression { http://git-wip-us.apache.org/repos/asf/spark/blob/6c5fd977/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 9c044f4..acb59d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -341,10 +341,16 @@ object QueryTest { * * @param df the [[DataFrame]] to be executed * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s. + * @param checkToRDD whether to verify deserialization to an RDD. This runs the query twice. */ - def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = { + def checkAnswer( + df: DataFrame, + expectedAnswer: Seq[Row], + checkToRDD: Boolean = true): Option[String] = { val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty - + if (checkToRDD) { + df.rdd.count() // Also attempt to deserialize as an RDD [SPARK-15791] + } val sparkAnswer = try df.collect().toSeq catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/spark/blob/6c5fd977/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8284e8d..90465b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2118,7 +2118,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { // is correct. def verifyCallCount(df: DataFrame, expectedResult: Row, expectedCount: Int): Unit = { countAcc.setValue(0) - checkAnswer(df, expectedResult) + QueryTest.checkAnswer( + df, Seq(expectedResult), checkToRDD = false /* avoid duplicate exec */) assert(countAcc.value == expectedCount) } http://git-wip-us.apache.org/repos/asf/spark/blob/6c5fd977/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index a932125..05491a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -54,6 +54,10 @@ class SubquerySuite extends QueryTest with SharedSQLContext { t.createOrReplaceTempView("t") } + test("rdd deserialization does not crash [SPARK-15791]") { + sql("select (select 1 as b) as b").rdd.count() + } + test("simple uncorrelated scalar subquery") { checkAnswer( sql("select (select 1 as b) as b"), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
