Repository: incubator-griffin Updated Branches: refs/heads/master fdcd7f75d -> 9a6dc4ba7
add regex replace udf Author: Lionel Liu <[email protected]> Closes #216 from bhlx3lyx7/tmst. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/9a6dc4ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/9a6dc4ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/9a6dc4ba Branch: refs/heads/master Commit: 9a6dc4ba7683119c68a939245edea5de4986fadb Parents: fdcd7f7 Author: Lionel Liu <[email protected]> Authored: Wed Feb 7 18:23:03 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Wed Feb 7 18:23:03 2018 +0800 ---------------------------------------------------------------------- .../source/cache/ParquetDataSourceCache.scala | 2 +- .../griffin/measure/rule/udf/GriffinUdfs.scala | 13 +++-- .../resources/_profiling-batch-griffindsl.json | 14 ++++-- .../measure/rule/udf/GriffinUdfsTest.scala | 50 ++++++++++++++++++++ 4 files changed, 71 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9a6dc4ba/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala index 1761f56..89cd0b7 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala @@ -25,7 +25,7 @@ case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any ) extends DataSourceCache { override def init(): Unit = { - sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false"); + sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") } def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9a6dc4ba/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala index 37d2a5a..1d9eb8b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala @@ -23,16 +23,21 @@ import org.apache.spark.sql.SQLContext object GriffinUdfs { def register(sqlContext: SQLContext): Unit = { - sqlContext.udf.register("index_of", indexOf) - sqlContext.udf.register("matches", matches) + sqlContext.udf.register("index_of", indexOf _) + sqlContext.udf.register("matches", matches _) + sqlContext.udf.register("reg_replace", regReplace _) } - private val indexOf = (arr: Seq[String], v: String) => { + private def indexOf(arr: Seq[String], v: String) = { arr.indexOf(v) } - private val matches = (s: String, regex: String) => { + private def matches(s: String, regex: String) = { s.matches(regex) } + private def regReplace(s: String, regex: String, replacement: String) = { + s.replaceAll(regex, replacement) + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9a6dc4ba/measure/src/test/resources/_profiling-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json b/measure/src/test/resources/_profiling-batch-griffindsl.json index 043ba85..fec178d 100644 --- a/measure/src/test/resources/_profiling-batch-griffindsl.json +++ b/measure/src/test/resources/_profiling-batch-griffindsl.json @@ -14,7 +14,14 @@ "version": "1.7", "config": { "file.name": "src/test/resources/users_info_src.avro" - } + }, + "pre.proc": [ + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select reg_replace(email, '^([^@0-9]+)([0-9]+)@(dc)(?:\\\\.[^@]+)$', '$1@$3') as email, post_code from ${this}" + } + ] } ] } @@ -26,9 +33,10 @@ "dsl.type": "griffin-dsl", "dq.type": "profiling", "name": "prof", - "rule": "count(*) from source", + "rule": "email, count(*) from source group by email", "metric": { - "name": "prof" + "name": "prof", + "collect.type": "array" } }, { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9a6dc4ba/measure/src/test/scala/org/apache/griffin/measure/rule/udf/GriffinUdfsTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/rule/udf/GriffinUdfsTest.scala b/measure/src/test/scala/org/apache/griffin/measure/rule/udf/GriffinUdfsTest.scala new file mode 100644 index 0000000..7f74716 --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/rule/udf/GriffinUdfsTest.scala @@ -0,0 +1,50 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.udf + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +//import org.scalatest.FlatSpec +import org.scalatest.PrivateMethodTester +//import org.scalamock.scalatest.MockFactory + +@RunWith(classOf[JUnitRunner]) +class GriffinUdfsTest extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester { + + test ("test indexOf") { + val inv = new Invocation[Int]('indexOf, "a" :: "b" :: "c" :: Nil, "b") + GriffinUdfs.invokePrivate(inv) should be (1) + } + + test ("test matches") { + val inv = new Invocation[Boolean]('matches, "s123", "^s\\d+$") + GriffinUdfs.invokePrivate(inv) should be (true) + } + + test ("test regexSubstr") { + val str = "https://www.abc.com/test/dp/B023/ref=sr_1_1/123-456?id=123" + val regexStr = """^([^/]+://[^/]+)(?:/[^/]+)?(/dp/[A-Z0-9]+)(?:/.*)?$""" + val replacement = "$1$2" + val inv = new Invocation[String]('regReplace, str, regexStr, replacement) + GriffinUdfs.invokePrivate(inv) should be ("https://www.abc.com/dp/B023") + } + +}
