Repository: spark
Updated Branches:
  refs/heads/master 337c16d57 -> 767cc94ca


[SPARK-7158] [SQL] Fix bug of cached data cannot be used in collect() after 
cache()

When df.cache() method called, the `withCachedData` of `QueryExecution` has 
been created, which mean it will not look up the cached tables when action 
method called afterward.

Author: Cheng Hao <[email protected]>

Closes #5714 from chenghao-intel/SPARK-7158 and squashes the following commits:

58ea8aa [Cheng Hao] style issue
2bf740f [Cheng Hao] create new QueryExecution instance for CacheManager
a5647d9 [Cheng Hao] hide the queryExecution of DataFrame
fbfd3c5 [Cheng Hao] make the DataFrame.queryExecution mutable for 
cache/persist/unpersist


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/767cc94c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/767cc94c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/767cc94c

Branch: refs/heads/master
Commit: 767cc94ca6d397ba19226996ccb3c8e57083c549
Parents: 337c16d
Author: Cheng Hao <[email protected]>
Authored: Thu Jun 11 18:01:32 2015 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Thu Jun 11 18:01:47 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/CacheManager.scala      |  2 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 26 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/767cc94c/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 5fcc48a..a4b38d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -103,7 +103,7 @@ private[sql] class CacheManager(sqlContext: SQLContext) 
extends Logging {
             sqlContext.conf.useCompression,
             sqlContext.conf.columnBatchSize,
             storageLevel,
-            query.queryExecution.executedPlan,
+            sqlContext.executePlan(query.logicalPlan).executedPlan,
             tableName))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/767cc94c/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 3ca5ff3..14ecd4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -123,6 +123,32 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll with SQLTestUtils {
     )
   }
 
+  test("SPARK-7158 collect and take return different results") {
+    import java.util.UUID
+    import org.apache.spark.sql.types._
+
+    val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index")
+    // we except the id is materialized once
+    def id: () => String = () => { UUID.randomUUID().toString() }
+
+    val dfWithId = df.withColumn("id", callUDF(id, StringType))
+    // Make a new DataFrame (actually the same reference to the old one)
+    val cached = dfWithId.cache()
+    // Trigger the cache
+    val d0 = dfWithId.collect()
+    val d1 = cached.collect()
+    val d2 = cached.collect()
+
+    // Since the ID is only materialized once, then all of the records
+    // should come from the cache, not by re-computing. Otherwise, the ID
+    // will be different
+    assert(d0.map(_(0)) === d2.map(_(0)))
+    assert(d0.map(_(1)) === d2.map(_(1)))
+
+    assert(d1.map(_(0)) === d2.map(_(0)))
+    assert(d1.map(_(1)) === d2.map(_(1)))
+  }
+
   test("grouping on nested fields") {
     sqlContext.read.json(sqlContext.sparkContext.parallelize(
       """{"nested": {"attribute": 1}, "value": 2}""" :: Nil))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to