Repository: incubator-griffin
Updated Branches:
  refs/heads/master 564d4ce21 -> ddff93bf0


a udf testcase

a local standalone spark is instantiated to test a udf.

Author: panffeng <[email protected]>

Closes #262 from panffeng/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/ddff93bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/ddff93bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/ddff93bf

Branch: refs/heads/master
Commit: ddff93bf0234798bb3e0ed1873282aa96351b222
Parents: 564d4ce
Author: panffeng <[email protected]>
Authored: Tue Apr 24 14:40:58 2018 +0800
Committer: Lionel Liu <[email protected]>
Committed: Tue Apr 24 14:40:58 2018 +0800

----------------------------------------------------------------------
 .../griffin/measure/rule/udf/MeanUdafTest.scala | 98 ++++++++++++++++++++
 1 file changed, 98 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ddff93bf/measure/src/test/scala/org/apache/griffin/measure/rule/udf/MeanUdafTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/rule/udf/MeanUdafTest.scala 
b/measure/src/test/scala/org/apache/griffin/measure/rule/udf/MeanUdafTest.scala
new file mode 100644
index 0000000..4f882de
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/rule/udf/MeanUdafTest.scala
@@ -0,0 +1,98 @@
+package org.apache.griffin.measure.rule.udf
+
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.FunSuite
+
+
+class MeanUdafTest extends FunSuite  {
+
+  val sparkConf = new SparkConf()
+  sparkConf.setMaster("local")
+  //sparkConf.setSparkHome(System.getenv("SPARK_HOME"))
+  sparkConf.setAppName("UDFTest")
+
+  val sparkContext = new SparkContext(sparkConf)
+
+  implicit val sqlContext = new SQLContext(sparkContext)
+
+  import sqlContext.implicits._
+
+
+  test("test the udf"){
+
+    sqlContext.udf.register("my_mean", new MeanUdaf)
+
+    // the udf will not work with nulls.
+    val data1 = Seq[(Integer, Integer)](
+      (1, 10),
+      (null, -60),
+      (1, 20),
+      (1, 30),
+      (2, 0),
+      (null, -10),
+      (2, -1),
+      (2, null),
+      (2, null),
+      (null, 100),
+      (3, null),
+      (null, null),
+      (3, null)).toDF("key", "value")
+    data1.registerTempTable("agg1")
+
+    val data2 = Seq[(Integer, Integer)](
+      (1, 10),
+      (1, -60),
+      (1, 20),
+      (1, 30),
+      (2, 0),
+      (2, -10),
+      (2, -1),
+      (2, -5),
+      (2, 5),
+      (3, 100),
+      (3, -10),
+      (3, 10),
+      (3, 10)).toDF("key", "value")
+    data2.registerTempTable("agg2")
+
+
+    val nullFreeDf = sqlContext.sql(
+
+      """
+        |SELECT
+        |  key,
+        |  my_mean(value) as udfVal,
+        |  avg(value) as defaultVal,
+        |  count(*) as totalCount,
+        |  sum(value) as totalSum
+        |FROM agg2
+        |GROUP BY key
+      """.stripMargin).toDF()
+
+
+    nullFreeDf.registerTempTable("agg0")
+    nullFreeDf.show()
+
+    val result = sqlContext.sql(
+
+      """
+        |SELECT
+        | SUM(IF(udfVal=defaultVal, 0, 1)) equalCols,
+        | SUM(IF(udfVal=totalSum/totalCount, 0, 1)) avgCols
+        |FROM agg0
+      """.stripMargin).toDF()
+
+    result.show()
+
+    assert(result.count() === 1)
+
+    assert(result.head().getLong(0) === 0L)
+    assert(result.head().getLong(1) === 0L)
+
+  }
+
+
+
+}

Reply via email to