Repository: incubator-griffin
Updated Branches:
  refs/heads/master 23a3e999e -> 03a2efca4


data frame cache unit test

Author: Lionel Liu <[email protected]>
Author: dodobel <[email protected]>

Closes #303 from bhlx3lyx7/spark2.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/03a2efca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/03a2efca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/03a2efca

Branch: refs/heads/master
Commit: 03a2efca49e1f464c824b101fad732fa82184ef5
Parents: 23a3e99
Author: Lionel Liu <[email protected]>
Authored: Thu Jun 14 18:13:10 2018 +0800
Committer: Lionel Liu <[email protected]>
Committed: Thu Jun 14 18:13:10 2018 +0800

----------------------------------------------------------------------
 measure/pom.xml                                 |  9 +++
 .../measure/context/DataFrameCache.scala        |  5 +-
 .../measure/context/DataFrameCacheTest.scala    | 69 ++++++++++++++++++++
 3 files changed, 81 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/03a2efca/measure/pom.xml
----------------------------------------------------------------------
diff --git a/measure/pom.xml b/measure/pom.xml
index 5b95025..c4906f6 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -52,6 +52,7 @@ under the License.
         <log4j.version>1.2.16</log4j.version>
         <curator.version>2.10.0</curator.version>
         <scalamock.version>3.6.0</scalamock.version>
+        <spark.testing.version>0.9.0</spark.testing.version>
     </properties>
 
     <dependencies>
@@ -173,6 +174,14 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <!--spark testing-->
+        <dependency>
+            <groupId>com.holdenkarau</groupId>
+            <artifactId>spark-testing-base_${scala.binary.version}</artifactId>
+            <version>${spark.version}_${spark.testing.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/03a2efca/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
index 765dc6f..38227eb 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
@@ -29,8 +29,8 @@ import scala.collection.mutable.{MutableList, Map => 
MutableMap}
   */
 case class DataFrameCache() extends Loggable {
 
-  private val dataFrames: MutableMap[String, DataFrame] = MutableMap()
-  private val trashDataFrames: MutableList[DataFrame] = MutableList()
+  val dataFrames: MutableMap[String, DataFrame] = MutableMap()
+  val trashDataFrames: MutableList[DataFrame] = MutableList()
 
   private def trashDataFrame(df: DataFrame): Unit = {
     trashDataFrames += df
@@ -67,6 +67,7 @@ case class DataFrameCache() extends Loggable {
 
   def clearAllTrashDataFrames(): Unit = {
     trashDataFrames.foreach(_.unpersist)
+    trashDataFrames.clear
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/03a2efca/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
new file mode 100644
index 0000000..6d6375e
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
@@ -0,0 +1,69 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+import org.scalatest._
+
+class DataFrameCacheTest extends FlatSpec with Matchers with 
DataFrameSuiteBase {
+
+  def createDataFrame(arr: Seq[Int]): DataFrame = {
+    val schema = StructType(Array(
+      StructField("id", LongType),
+      StructField("name", StringType),
+      StructField("age", IntegerType)
+    ))
+    val rows = arr.map { i =>
+      Row(i.toLong, s"name_$i", i + 15)
+    }
+    val rowRdd = sqlContext.sparkContext.parallelize(rows)
+    sqlContext.createDataFrame(rowRdd, schema)
+  }
+
+  "data frame cache" should "be able to cache and uncache data frames" in {
+    val dfCache = DataFrameCache()
+    val df1 = createDataFrame(1 to 5)
+    val df2 = createDataFrame(1 to 10)
+    val df3 = createDataFrame(1 to 15)
+
+    // cache
+    dfCache.cacheDataFrame("t1", df1)
+    dfCache.cacheDataFrame("t2", df2)
+    dfCache.cacheDataFrame("t3", df3)
+    dfCache.dataFrames.get("t2") should be (Some(df2))
+
+    // uncache
+    dfCache.uncacheDataFrame("t2")
+    dfCache.dataFrames.get("t2") should be (None)
+    dfCache.trashDataFrames.toList should be (df2 :: Nil)
+
+    // uncache all
+    dfCache.uncacheAllDataFrames()
+    dfCache.dataFrames.toMap should be (Map[String, DataFrame]())
+    dfCache.trashDataFrames.toList should be (df2 :: df1 :: df3 :: Nil)
+
+    // clear all trash
+    dfCache.clearAllTrashDataFrames()
+    dfCache.trashDataFrames.toList should be (Nil)
+  }
+
+
+}

Reply via email to