Repository: spark Updated Branches: refs/heads/master 337c16d57 -> 767cc94ca
[SPARK-7158] [SQL] Fix bug of cached data cannot be used in collect() after cache() When df.cache() method called, the `withCachedData` of `QueryExecution` has been created, which mean it will not look up the cached tables when action method called afterward. Author: Cheng Hao <[email protected]> Closes #5714 from chenghao-intel/SPARK-7158 and squashes the following commits: 58ea8aa [Cheng Hao] style issue 2bf740f [Cheng Hao] create new QueryExecution instance for CacheManager a5647d9 [Cheng Hao] hide the queryExecution of DataFrame fbfd3c5 [Cheng Hao] make the DataFrame.queryExecution mutable for cache/persist/unpersist Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/767cc94c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/767cc94c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/767cc94c Branch: refs/heads/master Commit: 767cc94ca6d397ba19226996ccb3c8e57083c549 Parents: 337c16d Author: Cheng Hao <[email protected]> Authored: Thu Jun 11 18:01:32 2015 -0700 Committer: Michael Armbrust <[email protected]> Committed: Thu Jun 11 18:01:47 2015 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/CacheManager.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 26 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/767cc94c/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 5fcc48a..a4b38d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -103,7 +103,7 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging { sqlContext.conf.useCompression, sqlContext.conf.columnBatchSize, storageLevel, - query.queryExecution.executedPlan, + sqlContext.executePlan(query.logicalPlan).executedPlan, tableName)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/767cc94c/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 3ca5ff3..14ecd4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -123,6 +123,32 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { ) } + test("SPARK-7158 collect and take return different results") { + import java.util.UUID + import org.apache.spark.sql.types._ + + val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index") + // we except the id is materialized once + def id: () => String = () => { UUID.randomUUID().toString() } + + val dfWithId = df.withColumn("id", callUDF(id, StringType)) + // Make a new DataFrame (actually the same reference to the old one) + val cached = dfWithId.cache() + // Trigger the cache + val d0 = dfWithId.collect() + val d1 = cached.collect() + val d2 = cached.collect() + + // Since the ID is only materialized once, then all of the records + // should come from the cache, not by re-computing. Otherwise, the ID + // will be different + assert(d0.map(_(0)) === d2.map(_(0))) + assert(d0.map(_(1)) === d2.map(_(1))) + + assert(d1.map(_(0)) === d2.map(_(0))) + assert(d1.map(_(1)) === d2.map(_(1))) + } + test("grouping on nested fields") { sqlContext.read.json(sqlContext.sparkContext.parallelize( """{"nested": {"attribute": 1}, "value": 2}""" :: Nil)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
