Repository: incubator-griffin Updated Branches: refs/heads/master 564d4ce21 -> ddff93bf0
a udf testcase a local standalone spark is instantiated to test a udf. Author: panffeng <[email protected]> Closes #262 from panffeng/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/ddff93bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/ddff93bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/ddff93bf Branch: refs/heads/master Commit: ddff93bf0234798bb3e0ed1873282aa96351b222 Parents: 564d4ce Author: panffeng <[email protected]> Authored: Tue Apr 24 14:40:58 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Tue Apr 24 14:40:58 2018 +0800 ---------------------------------------------------------------------- .../griffin/measure/rule/udf/MeanUdafTest.scala | 98 ++++++++++++++++++++ 1 file changed, 98 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ddff93bf/measure/src/test/scala/org/apache/griffin/measure/rule/udf/MeanUdafTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/rule/udf/MeanUdafTest.scala b/measure/src/test/scala/org/apache/griffin/measure/rule/udf/MeanUdafTest.scala new file mode 100644 index 0000000..4f882de --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/rule/udf/MeanUdafTest.scala @@ -0,0 +1,98 @@ +package org.apache.griffin.measure.rule.udf + + +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.FunSuite + + +class MeanUdafTest extends FunSuite { + + val sparkConf = new SparkConf() + sparkConf.setMaster("local") + //sparkConf.setSparkHome(System.getenv("SPARK_HOME")) + sparkConf.setAppName("UDFTest") + + val sparkContext = new SparkContext(sparkConf) + + implicit val sqlContext = new SQLContext(sparkContext) + + import sqlContext.implicits._ + + + test("test the udf"){ + + sqlContext.udf.register("my_mean", new MeanUdaf) + + // the udf will not work with nulls. + val data1 = Seq[(Integer, Integer)]( + (1, 10), + (null, -60), + (1, 20), + (1, 30), + (2, 0), + (null, -10), + (2, -1), + (2, null), + (2, null), + (null, 100), + (3, null), + (null, null), + (3, null)).toDF("key", "value") + data1.registerTempTable("agg1") + + val data2 = Seq[(Integer, Integer)]( + (1, 10), + (1, -60), + (1, 20), + (1, 30), + (2, 0), + (2, -10), + (2, -1), + (2, -5), + (2, 5), + (3, 100), + (3, -10), + (3, 10), + (3, 10)).toDF("key", "value") + data2.registerTempTable("agg2") + + + val nullFreeDf = sqlContext.sql( + + """ + |SELECT + | key, + | my_mean(value) as udfVal, + | avg(value) as defaultVal, + | count(*) as totalCount, + | sum(value) as totalSum + |FROM agg2 + |GROUP BY key + """.stripMargin).toDF() + + + nullFreeDf.registerTempTable("agg0") + nullFreeDf.show() + + val result = sqlContext.sql( + + """ + |SELECT + | SUM(IF(udfVal=defaultVal, 0, 1)) equalCols, + | SUM(IF(udfVal=totalSum/totalCount, 0, 1)) avgCols + |FROM agg0 + """.stripMargin).toDF() + + result.show() + + assert(result.count() === 1) + + assert(result.head().getLong(0) === 0L) + assert(result.head().getLong(1) === 0L) + + } + + + +}
