Repository: incubator-griffin Updated Branches: refs/heads/master 23a3e999e -> 03a2efca4
data frame cache unit test Author: Lionel Liu <[email protected]> Author: dodobel <[email protected]> Closes #303 from bhlx3lyx7/spark2. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/03a2efca Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/03a2efca Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/03a2efca Branch: refs/heads/master Commit: 03a2efca49e1f464c824b101fad732fa82184ef5 Parents: 23a3e99 Author: Lionel Liu <[email protected]> Authored: Thu Jun 14 18:13:10 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Thu Jun 14 18:13:10 2018 +0800 ---------------------------------------------------------------------- measure/pom.xml | 9 +++ .../measure/context/DataFrameCache.scala | 5 +- .../measure/context/DataFrameCacheTest.scala | 69 ++++++++++++++++++++ 3 files changed, 81 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/03a2efca/measure/pom.xml ---------------------------------------------------------------------- diff --git a/measure/pom.xml b/measure/pom.xml index 5b95025..c4906f6 100644 --- a/measure/pom.xml +++ b/measure/pom.xml @@ -52,6 +52,7 @@ under the License. <log4j.version>1.2.16</log4j.version> <curator.version>2.10.0</curator.version> <scalamock.version>3.6.0</scalamock.version> + <spark.testing.version>0.9.0</spark.testing.version> </properties> <dependencies> @@ -173,6 +174,14 @@ under the License. <scope>test</scope> </dependency> + <!--spark testing--> + <dependency> + <groupId>com.holdenkarau</groupId> + <artifactId>spark-testing-base_${scala.binary.version}</artifactId> + <version>${spark.version}_${spark.testing.version}</version> + <scope>test</scope> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/03a2efca/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala index 765dc6f..38227eb 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala @@ -29,8 +29,8 @@ import scala.collection.mutable.{MutableList, Map => MutableMap} */ case class DataFrameCache() extends Loggable { - private val dataFrames: MutableMap[String, DataFrame] = MutableMap() - private val trashDataFrames: MutableList[DataFrame] = MutableList() + val dataFrames: MutableMap[String, DataFrame] = MutableMap() + val trashDataFrames: MutableList[DataFrame] = MutableList() private def trashDataFrame(df: DataFrame): Unit = { trashDataFrames += df @@ -67,6 +67,7 @@ case class DataFrameCache() extends Loggable { def clearAllTrashDataFrames(): Unit = { trashDataFrames.foreach(_.unpersist) + trashDataFrames.clear } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/03a2efca/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala new file mode 100644 index 0000000..6d6375e --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala @@ -0,0 +1,69 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context + +import com.holdenkarau.spark.testing.DataFrameSuiteBase +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types._ +import org.scalatest._ + +class DataFrameCacheTest extends FlatSpec with Matchers with DataFrameSuiteBase { + + def createDataFrame(arr: Seq[Int]): DataFrame = { + val schema = StructType(Array( + StructField("id", LongType), + StructField("name", StringType), + StructField("age", IntegerType) + )) + val rows = arr.map { i => + Row(i.toLong, s"name_$i", i + 15) + } + val rowRdd = sqlContext.sparkContext.parallelize(rows) + sqlContext.createDataFrame(rowRdd, schema) + } + + "data frame cache" should "be able to cache and uncache data frames" in { + val dfCache = DataFrameCache() + val df1 = createDataFrame(1 to 5) + val df2 = createDataFrame(1 to 10) + val df3 = createDataFrame(1 to 15) + + // cache + dfCache.cacheDataFrame("t1", df1) + dfCache.cacheDataFrame("t2", df2) + dfCache.cacheDataFrame("t3", df3) + dfCache.dataFrames.get("t2") should be (Some(df2)) + + // uncache + dfCache.uncacheDataFrame("t2") + dfCache.dataFrames.get("t2") should be (None) + dfCache.trashDataFrames.toList should be (df2 :: Nil) + + // uncache all + dfCache.uncacheAllDataFrames() + dfCache.dataFrames.toMap should be (Map[String, DataFrame]()) + dfCache.trashDataFrames.toList should be (df2 :: df1 :: df3 :: Nil) + + // clear all trash + dfCache.clearAllTrashDataFrames() + dfCache.trashDataFrames.toList should be (Nil) + } + + +}
