http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala new file mode 100644 index 0000000..fec8c49 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala @@ -0,0 +1,265 @@ +package org.apache.griffin.measure.batch.utils + +import scala.util.{Success, Try} + + +object CalculationUtil { + + implicit def option2CalculationValue(v: Option[_]): CalculationValue = CalculationValue(v) + + case class CalculationValue(value: Option[_]) extends Serializable { + + def + (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString) + case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 + v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble) + case (None, Some(v2)) => other + case _ => value + } + } match { + case Success(opt) => opt + case _ => value + } + } + + def - (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 - v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 - v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 - v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => value + } + } + + def * (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2) + case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt) + case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 * v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 * v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 * v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 * v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 * v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => value + } + } + + def / (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 / v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 / v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 / v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => value + } + } + + def % (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 % v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 % v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 % v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => value + } + } + + def unary_- (): Option[_] = { + value match { + case Some(v: String) => Some(v.reverse.toString) + case Some(v: Boolean) => Some(!v) + case Some(v: Byte) => Some(-v) + case Some(v: Short) => Some(-v) + case Some(v: Int) => Some(-v) + case Some(v: Long) => Some(-v) + case Some(v: Float) => Some(-v) + case Some(v: Double) => Some(-v) + case Some(v) => Some(v) + case _ => None + } + } + + + def === (other: Option[_]): Option[Boolean] = { + (value, other) match { + case (Some(v1), Some(v2)) => Some(v1 == v2) + case _ => None + } + } + + def =!= (other: Option[_]): Option[Boolean] = { + (value, other) match { + case (Some(v1), Some(v2)) => Some(v1 != v2) + case _ => None + } + } + + def > (other: Option[_]): Option[Boolean] = { + Try { + (value, other) match { + case (Some(v1: String), Some(v2: String)) => Some(v1 > v2) + case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Int), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Long), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Float), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Double), Some(v2)) => Some(v1 > v2.toString.toDouble) + case _ => None + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def >= (other: Option[_]): Option[Boolean] = { + Try { + (value, other) match { + case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2) + case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Int), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Long), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Float), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Double), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case _ => None + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def < (other: Option[_]): Option[Boolean] = { + Try { + (value, other) match { + case (Some(v1: String), Some(v2: String)) => Some(v1 < v2) + case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Int), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Long), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Float), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Double), Some(v2)) => Some(v1 < v2.toString.toDouble) + case _ => None + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def <= (other: Option[_]): Option[Boolean] = { + Try { + (value, other) match { + case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2) + case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Int), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Long), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Float), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Double), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case _ => None + } + } match { + case Success(opt) => opt + case _ => None + } + } + + + def in (other: Iterable[Option[_]]): Option[Boolean] = { + other.foldLeft(Some(false): Option[Boolean]) { (res, next) => + optOr(res, ===(next)) + } + } + + def not_in (other: Iterable[Option[_]]): Option[Boolean] = { + other.foldLeft(Some(true): Option[Boolean]) { (res, next) => + optAnd(res, =!=(next)) + } + } + + def between (other: Iterable[Option[_]]): Option[Boolean] = { + if (other.size < 2) None else { + val (begin, end) = (other.head, other.tail.head) + optAnd(>=(begin), <=(end)) + } + } + + def not_between (other: Iterable[Option[_]]): Option[Boolean] = { + if (other.size < 2) None else { + val (begin, end) = (other.head, other.tail.head) + optOr(<(begin), >(end)) + } + } + + def unary_! (): Option[Boolean] = { + optNot(value) + } + + def && (other: Option[_]): Option[Boolean] = { + optAnd(value, other) + } + + def || (other: Option[_]): Option[Boolean] = { + optOr(value, other) + } + + + private def optNot(a: Option[_]): Option[Boolean] = { + a match { + case Some(v: Boolean) => Some(!v) + case _ => None + } + } + private def optAnd(a: Option[_], b: Option[_]): Option[Boolean] = { + (a, b) match { + case (Some(false), _) | (_, Some(false)) => Some(false) + case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2) + case _ => None + } + } + private def optOr(a: Option[_], b: Option[_]): Option[Boolean] = { + (a, b) match { + case (Some(true), _) | (_, Some(true)) => Some(true) + case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2) + case _ => None + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala new file mode 100644 index 0000000..b48478a --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala @@ -0,0 +1,62 @@ +package org.apache.griffin.measure.batch.utils + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} + +object HdfsUtil { + + private val seprator = "/" + + private val conf = new Configuration() + conf.set("dfs.support.append", "true") + + private val dfs = FileSystem.get(conf) + + def existPath(filePath: String): Boolean = { + val path = new Path(filePath) + dfs.exists(path) + } + + def createFile(filePath: String): FSDataOutputStream = { + val path = new Path(filePath) + if (dfs.exists(path)) dfs.delete(path, true) + return dfs.create(path) + } + + def appendOrCreateFile(filePath: String): FSDataOutputStream = { + val path = new Path(filePath) + if (dfs.exists(path)) dfs.append(path) else createFile(filePath) + } + + def openFile(filePath: String): FSDataInputStream = { + val path = new Path(filePath) + dfs.open(path) + } + + def writeContent(filePath: String, message: String): Unit = { + val out = createFile(filePath) + out.write(message.getBytes("utf-8")) + out.close + } + + def appendContent(filePath: String, message: String): Unit = { + val out = appendOrCreateFile(filePath) + out.write(message.getBytes("utf-8")) + out.close + } + + def createEmptyFile(filePath: String): Unit = { + val out = createFile(filePath) + out.close + } + + + def getHdfsFilePath(parentPath: String, fileName: String): String = { + if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + seprator + fileName + } + + def deleteHdfsPath(dirPath: String): Unit = { + val path = new Path(dirPath) + if (dfs.exists(path)) dfs.delete(path, true) + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala new file mode 100644 index 0000000..747d0fa --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala @@ -0,0 +1,30 @@ +package org.apache.griffin.measure.batch.utils + +import scalaj.http._ + +object HttpUtil { + + val GET_REGEX = """^(?i)get$""".r + val POST_REGEX = """^(?i)post$""".r + val PUT_REGEX = """^(?i)put$""".r + val DELETE_REGEX = """^(?i)delete$""".r + + def postData(url: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = { + val response = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString + response.code.toString + } + + def httpRequest(url: String, method: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = { + val httpReq = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)) + method match { + case POST_REGEX() => httpReq.postData(data).asString.code.toString + case PUT_REGEX() => httpReq.put(data).asString.code.toString + case _ => "wrong method" + } + } + + private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, String] = { + map.map(pair => pair._1 -> pair._2.toString) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala new file mode 100644 index 0000000..cdd470a --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala @@ -0,0 +1,32 @@ +package org.apache.griffin.measure.batch.utils + +import java.io.InputStream + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import scala.reflect._ + +object JsonUtil { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + + def toJson(value: Map[Symbol, Any]): String = { + toJson(value map { case (k,v) => k.name -> v}) + } + + def toJson(value: Any): String = { + mapper.writeValueAsString(value) + } + + def toMap[V](json:String)(implicit m: Manifest[V]) = fromJson[Map[String,V]](json) + + def fromJson[T: ClassTag](json: String)(implicit m : Manifest[T]): T = { + mapper.readValue[T](json, classTag[T].runtimeClass.asInstanceOf[Class[T]]) + } + + def fromJson[T: ClassTag](is: InputStream)(implicit m : Manifest[T]): T = { + mapper.readValue[T](is, classTag[T].runtimeClass.asInstanceOf[Class[T]]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala new file mode 100644 index 0000000..7f2b355 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala @@ -0,0 +1,10 @@ +package org.apache.griffin.measure.batch.utils + +object StringParseUtil { + + def sepStrings(str: String, sep: String): Iterable[String] = { + val strings = str.split(sep) + strings.map(_.trim) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/config.json ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/test/resources/config.json b/measure/measure-batch/src/test/resources/config.json new file mode 100644 index 0000000..65e0ed9 --- /dev/null +++ b/measure/measure-batch/src/test/resources/config.json @@ -0,0 +1,25 @@ +{ + "name": "accu1", + "type": "accuracy", + + "source": { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + }, + + "target": { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_target.avro" + } + }, + + "evaluateRule": { + "sampleRatio": 1, + "rules": "$source.user_id > 10020 AND $source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/config1.json ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/test/resources/config1.json b/measure/measure-batch/src/test/resources/config1.json new file mode 100644 index 0000000..d7290ba --- /dev/null +++ b/measure/measure-batch/src/test/resources/config1.json @@ -0,0 +1,27 @@ +{ + "name": "accu-test", + "type": "accuracy", + + "source": { + "type": "hive", + "version": "1.2", + "config": { + "table.name": "rheos_view_event", + "partitions": "dt=20170410, hour=15" + } + }, + + "target": { + "type": "hive", + "version": "1.2", + "config": { + "table.name": "be_view_event_queue", + "partitions": "dt=20170410, hour=15; dt=20170410, hour=16" + } + }, + + "evaluateRule": { + "sampleRatio": 1, + "rules": "@Key ${source}['uid'] === ${target}['uid']; @Key ${source}['eventtimestamp'] === ${target}['eventtimestamp']; ${source}['page_id'] === ${target}['page_id']; ${source}['site_id'] === ${target}['site_id']; ${source}['itm'] === ${target}['itm']" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/env.json ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/test/resources/env.json b/measure/measure-batch/src/test/resources/env.json new file mode 100644 index 0000000..3a9e38c --- /dev/null +++ b/measure/measure-batch/src/test/resources/env.json @@ -0,0 +1,27 @@ +{ + "spark": { + "log.level": "ERROR", + "checkpoint.dir": "hdfs:///griffin/batch/cp", + "config": {} + }, + + "persist": [ + { + "type": "hdfs", + "config": { + "path": "hdfs:///griffin/streaming/persist" + } + }, + { + "type": "http", + "config": { + "method": "post", + "api": "http://phxbark4dq-360935.stratus.phx.ebay.com:8080/" + } + } + ], + + "cleaner": { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/env1.json ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/test/resources/env1.json b/measure/measure-batch/src/test/resources/env1.json new file mode 100644 index 0000000..a059715 --- /dev/null +++ b/measure/measure-batch/src/test/resources/env1.json @@ -0,0 +1,21 @@ +{ + "spark": { + "log.level": "INFO", + "checkpoint.dir": "hdfs:///griffin/batch/cp", + "config": {} + }, + + "persist": [ + { + "type": "hdfs", + "config": { + "path": "hdfs:///user/b_des/bark/griffin-batch/test", + "max.lines.per.file": 10000 + } + } + ], + + "cleaner": { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/test/resources/log4j.properties b/measure/measure-batch/src/test/resources/log4j.properties new file mode 100644 index 0000000..bd31e15 --- /dev/null +++ b/measure/measure-batch/src/test/resources/log4j.properties @@ -0,0 +1,5 @@ +log4j.rootLogger=INFO, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/users_info_src.avro ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/test/resources/users_info_src.avro b/measure/measure-batch/src/test/resources/users_info_src.avro new file mode 100644 index 0000000..3d5c939 Binary files /dev/null and b/measure/measure-batch/src/test/resources/users_info_src.avro differ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/users_info_src.dat ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/test/resources/users_info_src.dat b/measure/measure-batch/src/test/resources/users_info_src.dat new file mode 100644 index 0000000..ce49443 --- /dev/null +++ b/measure/measure-batch/src/test/resources/users_info_src.dat @@ -0,0 +1,50 @@ +10001|Tom001|Jerrya|201 DisneyCity|[email protected]|10000001|94022 +10002|Tom002|Jerrya|202 DisneyCity|[email protected]|10000002|94022 +10003|Tom003|Jerrya|203 DisneyCity|[email protected]|10000003|94022 +10004|Tom004|Jerrya|204 DisneyCity|[email protected]|10000004|94022 +10005|Tom005|Jerrya|205 DisneyCity|[email protected]|10000005|94022 +10006|Tom006|Jerrya|206 DisneyCity|[email protected]|10000006|94022 +10007|Tom007|Jerrya|207 DisneyCity|[email protected]|10000007|94022 +10008|Tom008|Jerrya|208 DisneyCity|[email protected]|10000008|94022 +10009|Tom009|Jerrya|209 DisneyCity|[email protected]|10000009|94022 +10010|Tom010|Jerrya|210 DisneyCity|[email protected]|10000010|94022 +10011|Tom011|Jerrya|211 DisneyCity|[email protected]|10000011|94022 +10012|Tom012|Jerrya|212 DisneyCity|[email protected]|10000012|94022 +10013|Tom013|Jerrya|213 DisneyCity|[email protected]|10000013|94022 +10014|Tom014|Jerrya|214 DisneyCity|[email protected]|10000014|94022 +10015|Tom015|Jerrya|215 DisneyCity|[email protected]|10000015|94022 +10016|Tom016|Jerrya|216 DisneyCity|[email protected]|10000016|94022 +10017|Tom017|Jerrya|217 DisneyCity|[email protected]|10000017|94022 +10018|Tom018|Jerrya|218 DisneyCity|[email protected]|10000018|94022 +10019|Tom019|Jerrya|219 DisneyCity|[email protected]|10000019|94022 +10020|Tom020|Jerrya|220 DisneyCity|[email protected]|10000020|94022 +10021|Tom021|Jerrya|221 DisneyCity|[email protected]|10000021|94022 +10022|Tom022|Jerrya|222 DisneyCity|[email protected]|10000022|94022 +10023|Tom023|Jerrya|223 DisneyCity|[email protected]|10000023|94022 +10024|Tom024|Jerrya|224 DisneyCity|[email protected]|10000024|94022 +10025|Tom025|Jerrya|225 DisneyCity|[email protected]|10000025|94022 +10026|Tom026|Jerrya|226 DisneyCity|[email protected]|10000026|94022 +10027|Tom027|Jerrya|227 DisneyCity|[email protected]|10000027|94022 +10028|Tom028|Jerrya|228 DisneyCity|[email protected]|10000028|94022 +10029|Tom029|Jerrya|229 DisneyCity|[email protected]|10000029|94022 +10030|Tom030|Jerrya|230 DisneyCity|[email protected]|10000030|94022 +10031|Tom031|Jerrya|231 DisneyCity|[email protected]|10000031|94022 +10032|Tom032|Jerrya|232 DisneyCity|[email protected]|10000032|94022 +10033|Tom033|Jerrya|233 DisneyCity|[email protected]|10000033|94022 +10034|Tom034|Jerrya|234 DisneyCity|[email protected]|10000034|94022 +10035|Tom035|Jerrya|235 DisneyCity|[email protected]|10000035|94022 +10036|Tom036|Jerrya|236 DisneyCity|[email protected]|10000036|94022 +10037|Tom037|Jerrya|237 DisneyCity|[email protected]|10000037|94022 +10038|Tom038|Jerrya|238 DisneyCity|[email protected]|10000038|94022 +10039|Tom039|Jerrya|239 DisneyCity|[email protected]|10000039|94022 +10040|Tom040|Jerrya|240 DisneyCity|[email protected]|10000040|94022 +10041|Tom041|Jerrya|241 DisneyCity|[email protected]|10000041|94022 +10042|Tom042|Jerrya|242 DisneyCity|[email protected]|10000042|94022 +10043|Tom043|Jerrya|243 DisneyCity|[email protected]|10000043|94022 +10044|Tom044|Jerrya|244 DisneyCity|[email protected]|10000044|94022 +10045|Tom045|Jerrya|245 DisneyCity|[email protected]|10000045|94022 +10046|Tom046|Jerrya|246 DisneyCity|[email protected]|10000046|94022 +10047|Tom047|Jerrya|247 DisneyCity|[email protected]|10000047|94022 +10048|Tom048|Jerrya|248 DisneyCity|[email protected]|10000048|94022 +10049|Tom049|Jerrya|249 DisneyCity|[email protected]|10000049|94022 +10050|Tom050|Jerrya|250 DisneyCity|[email protected]|10000050|94022 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/users_info_target.avro ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/test/resources/users_info_target.avro b/measure/measure-batch/src/test/resources/users_info_target.avro new file mode 100644 index 0000000..104dd6c Binary files /dev/null and b/measure/measure-batch/src/test/resources/users_info_target.avro differ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/users_info_target.dat ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/test/resources/users_info_target.dat b/measure/measure-batch/src/test/resources/users_info_target.dat new file mode 100644 index 0000000..07a6b40 --- /dev/null +++ b/measure/measure-batch/src/test/resources/users_info_target.dat @@ -0,0 +1,50 @@ +10001|Tom001|Jerrya|201 DisneyCity|[email protected]|10000101|94022 +10002|Tom002|Jerrya|202 DisneyCity|[email protected]|10000102|94022 +10003|Tom003|Jerrya|203 DisneyCity|[email protected]|10000003|94022 +10004|Tom004|Jerrya|204 DisneyCity|[email protected]|10000004|94022 +10005|Tom005|Jerrya|205 DisneyCity|[email protected]|10000005|94022 +10006|Tom006|Jerrya|206 DisneyCity|[email protected]|10000006|94022 +10007|Tom007|Jerrya|207 DisneyCity|[email protected]|10000007|94022 +10008|Tom008|Jerrya|208 DisneyCity|[email protected]|10000008|94022 +10009|Tom009|Jerrya|209 DisneyCity|[email protected]|10000009|94022 +10010|Tom010|Jerrya|210 DisneyCity|[email protected]|10000010|94022 +10011|Tom011|Jerrya|211 DisneyCity|[email protected]|10000011|94022 +10012|Tom012|Jerrya|212 DisneyCity|[email protected]|10000012|94022 +10013|Tom013|Jerrya|213 DisneyCity|[email protected]|10000013|94022 +10014|Tom014|Jerrya|214 DisneyCity|[email protected]|10000014|94022 +10015|Tom015|Jerrya|215 DisneyCity|[email protected]|10000015|94022 +10016|Tom016|Jerrya|216 DisneyCity|[email protected]|10000016|94022 +10017|Tom017|Jerrya|217 DisneyCity|[email protected]|10000017|94022 +10018|Tom018|Jerrya|218 DisneyCity|[email protected]|10000018|94022 +10019|Tom019|Jerrya|219 DisneyCity|[email protected]|10000019|94022 +10020|Tom020|Jerrya|220 DisneyCity|[email protected]|10000020|94022 +10021|Tom021|Jerrya|221 DisneyCity|[email protected]|10000021|94022 +10022|Tom022|Jerrya|222 DisneyCity|[email protected]|10000022|94022 +10023|Tom023|Jerrya|223 DisneyCity|[email protected]|10000023|94022 +10024|Tom024|Jerrya|224 DisneyCity|[email protected]|10000024|94022 +10025|Tom025|Jerrya|225 DisneyCity|[email protected]|10000025|94022 +10026|Tom026|Jerrya|226 DisneyCity|[email protected]|10000026|94022 +10027|Tom027|Jerrya|227 DisneyCity|[email protected]|10000027|94022 +10028|Tom028|Jerrya|228 DisneyCity|[email protected]|10000028|94022 +10029|Tom029|Jerrya|229 DisneyCity|[email protected]|10000029|94022 +10030|Tom030|Jerrya|230 DisneyCity|[email protected]|10000030|94022 +10031|Tom031|Jerrya|231 DisneyCity|[email protected]|10000031|94022 +10032|Tom032|Jerrya|232 DisneyCity|[email protected]|10000032|94022 +10033|Tom033|Jerrya|233 DisneyCity|[email protected]|10000033|94022 +10034|Tom034|Jerrya|234 DisneyCity|[email protected]|10000034|94022 +10035|Tom035|Jerrya|235 DisneyCity|[email protected]|10000035|94022 +10036|Tom036|Jerrya|236 DisneyCity|[email protected]|10000036|94022 +10037|Tom037|Jerrya|237 DisneyCity|[email protected]|10000037|94022 +10038|Tom038|Jerrya|238 DisneyCity|[email protected]|10000038|94022 +10039|Tom039|Jerrya|239 DisneyCity|[email protected]|10000039|94022 +10040|Tom040|Jerrya|240 DisneyCity|[email protected]|10000040|94022 +10041|Tom041|Jerrya|241 DisneyCity|[email protected]|10000041|94022 +10042|Tom042|Jerrya|242 DisneyCity|[email protected]|10000042|94022 +10043|Tom043|Jerrya|243 DisneyCity|[email protected]|10000043|94022 +10044|Tom044|Jerrya|244 DisneyCity|[email protected]|10000044|94022 +10045|Tom045|Jerrya|245 DisneyCity|[email protected]|10000045|94022 +10046|Tom046|Jerrya|246 DisneyCity|[email protected]|10000046|94022 +10047|Tom047|Jerrya|247 DisneyCity|[email protected]|10000047|94022 +10048|Tom048|Jerrya|248 DisneyCity|[email protected]|10000048|94022 +10049|Tom049|Jerrya|249 DisneyCity|[email protected]|10000049|94022 +10050|Tom050|Jerrya|250 DisneyCity|[email protected]|10000050|94022 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala new file mode 100644 index 0000000..7448d54 --- /dev/null +++ b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala @@ -0,0 +1,223 @@ +package org.apache.griffin.measure.batch.algo + +import java.util.Date + +import org.apache.griffin.measure.batch.config.params._ +import org.apache.griffin.measure.batch.config.params.env._ +import org.apache.griffin.measure.batch.config.params.user._ +import org.apache.griffin.measure.batch.config.reader._ +import org.apache.griffin.measure.batch.config.validator._ +import org.apache.griffin.measure.batch.connector.{CacheDataUtil, DataConnector, DataConnectorFactory} +import org.apache.griffin.measure.batch.log.Loggable +import org.apache.griffin.measure.batch.rule.expr._ +import org.apache.griffin.measure.batch.rule.{RuleAnalyzer, RuleFactory} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} + +import scala.util.{Failure, Success, Try} + + +@RunWith(classOf[JUnitRunner]) +class BatchAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { + + val envFile = "src/test/resources/env.json" +// val confFile = "src/test/resources/config.json" + val confFile = "{\"name\":\"accu1\",\"type\":\"accuracy\",\"source\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_src.avro\"}},\"target\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_target.avro\"}},\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code AND (15 OR true) WHEN true AND $source.user_id > 10020\"}}" + val envFsType = "local" + val userFsType = "raw" + + val args = Array(envFile, confFile) + + var sc: SparkContext = _ + var sqlContext: SQLContext = _ + + var allParam: AllParam = _ + + before { + // read param files + val envParam = readParamFile[EnvParam](envFile, envFsType) match { + case Success(p) => p + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-2) + } + } + val userParam = readParamFile[UserParam](confFile, userFsType) match { + case Success(p) => p + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-2) + } + } + allParam = AllParam(envParam, userParam) + + // validate param files + validateParams(allParam) match { + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-3) + } + case _ => { + info("params validation pass") + } + } + + val metricName = userParam.name + val conf = new SparkConf().setMaster("local[*]").setAppName(metricName) + sc = new SparkContext(conf) + sqlContext = new SQLContext(sc) + } + + test("algorithm") { + Try { + val envParam = allParam.envParam + val userParam = allParam.userParam + + // start time + val startTime = new Date().getTime() + + // get spark application id + val applicationId = sc.applicationId + + // rules + val ruleFactory = RuleFactory(userParam.evaluateRuleParam) + val rule: StatementExpr = ruleFactory.generateRule() + val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule) + + ruleAnalyzer.globalCacheExprs.foreach(println) + ruleAnalyzer.globalFinalCacheExprs.foreach(println) + + // global cache data + val globalCachedData = CacheDataUtil.genCachedMap(None, ruleAnalyzer.globalCacheExprs, Map[String, Any]()) + val globalFinalCachedData = CacheDataUtil.filterCachedMap(ruleAnalyzer.globalFinalCacheExprs, globalCachedData) + + // data connector + val sourceDataConnector: DataConnector = + DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam, + ruleAnalyzer.sourceGroupbyExprs, ruleAnalyzer.sourceCacheExprs, + ruleAnalyzer.sourceFinalCacheExprs, globalFinalCachedData, + ruleAnalyzer.whenClauseExpr + ) match { + case Success(cntr) => { + if (cntr.available) cntr + else throw new Exception("source data not available!") + } + case Failure(ex) => throw ex + } + val targetDataConnector: DataConnector = + DataConnectorFactory.getDataConnector(sqlContext, userParam.targetParam, + ruleAnalyzer.targetGroupbyExprs, ruleAnalyzer.targetCacheExprs, + ruleAnalyzer.targetFinalCacheExprs, globalFinalCachedData, + ruleAnalyzer.whenClauseExpr + ) match { + case Success(cntr) => { + if (cntr.available) cntr + else throw new Exception("target data not available!") + } + case Failure(ex) => throw ex + } + + // get metadata +// val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match { +// case Success(md) => md +// case Failure(ex) => throw ex +// } +// val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match { +// case Success(md) => md +// case Failure(ex) => throw ex +// } + + // get data + val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match { + case Success(dt) => dt + case Failure(ex) => throw ex + } + val targetData: RDD[(Product, Map[String, Any])] = targetDataConnector.data() match { + case Success(dt) => dt + case Failure(ex) => throw ex + } + +// println("-- global cache exprs --") +// ruleAnalyzer.globalCacheExprs.foreach(a => println(s"${a._id} ${a.desc}")) +// println("-- global cache data --") +// globalCachedData.foreach(println) +// println("-- global final cache data --") +// globalFinalCachedData.foreach(println) +// +// println("-- source persist exprs --") +// ruleAnalyzer.sourcePersistExprs.foreach(a => println(s"${a._id} ${a.desc}")) +// println("-- source cache exprs --") +// ruleAnalyzer.sourceCacheExprs.foreach(a => println(s"${a._id} ${a.desc}")) +// +// println("-- source --") +// sourceData.foreach { a => +// val printMap = ruleAnalyzer.sourcePersistExprs.flatMap { expr => +// a._2.get(expr._id) match { +// case Some(v) => Some((expr._id + expr.desc, v)) +// case _ => None +// } +// }.toMap +// println(printMap) +// +// val cacheMap = ruleAnalyzer.sourceCacheExprs.flatMap { expr => +// a._2.get(expr._id) match { +// case Some(v) => Some((expr._id + expr.desc, v)) +// case _ => None +// } +// }.toMap +// println(cacheMap) +// +// println(a) +// println(a._2.size) +// } + +// println("-- target --") +// targetData.foreach { a => +// val printMap = ruleAnalyzer.targetPersistExprs.flatMap { expr => +// a._2.get(expr._id) match { +// case Some(v) => Some((expr.desc, v)) +// case _ => None +// } +// }.toMap +// println(printMap) +// } + + // my algo + val algo = BatchAccuracyAlgo(allParam) + + // accuracy algorithm + val (accuResult, missingRdd, matchingRdd) = algo.accuracy(sourceData, targetData, ruleAnalyzer) + + println(s"match percentage: ${accuResult.matchPercentage}, total count: ${accuResult.total}") + + missingRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourcePersistExprs, ruleAnalyzer.targetPersistExprs)).foreach(println) + + // end time + val endTime = new Date().getTime + println(s"using time: ${endTime - startTime} ms") + } match { + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-4) + } + case _ => { + info("calculation finished") + } + } + } + + private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { + val paramReader = ParamReaderFactory.getParamReader(file, fsType) + paramReader.readConfig[T] + } + + private def validateParams(allParam: AllParam): Try[Boolean] = { + val allParamValidator = AllParamValidator() + allParamValidator.validate(allParam) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala new file mode 100644 index 0000000..cf0c9b3 --- /dev/null +++ b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala @@ -0,0 +1,34 @@ +package org.apache.griffin.measure.batch.config.reader + +import org.apache.griffin.measure.batch.config.params.env._ +import org.apache.griffin.measure.batch.config.params.user._ +import org.apache.griffin.measure.batch.log.Loggable +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} + +import scala.util.{Failure, Success} + + +@RunWith(classOf[JUnitRunner]) +class ParamFileReaderTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { + + test("test file reader") { + val userReader = ParamFileReader("src/test/resources/config1.json") + val envReader = ParamFileReader("src/test/resources/env1.json") + + val p1 = userReader.readConfig[UserParam] + val p2 = envReader.readConfig[EnvParam] + + p1 match { + case Success(v) => println(v) + case Failure(ex) => error(ex.getMessage) + } + + p2 match { + case Success(v) => println(v) + case Failure(ex) => error(ex.getMessage) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala new file mode 100644 index 0000000..4715bcf --- /dev/null +++ b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala @@ -0,0 +1,74 @@ +package org.apache.griffin.measure.batch.rule + +import org.apache.griffin.measure.batch.log.Loggable +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} + + +@RunWith(classOf[JUnitRunner]) +class RuleParserTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { + +// test("test rule parser") { +// val ruleParser = RuleParser() +// +//// val rules = "outTime = 24h; @Invalid ${source}['__time'] + ${outTime} > ${target}['__time']" +// val rules = "@Key ${source}.json()['seeds'][*].json()['metadata'].json()['tracker']['crawlRequestCreateTS'] === ${target}.json()['groups'][0][\"attrsList\"]['name'=\"CRAWLMETADATA\"]['values'][0].json()['tracker']['crawlRequestCreateTS']" +//// val rules = "${source}['__time'] + ${outTime} > ${target}['__time']" +//// val rules = "${source}['__time'] > ${target}['__time']" +//// val rules = "432" +//// val rules = "${target}.json()['groups'][0]['attrsList']['name'='URL']['values'][0]" +// +// val result = ruleParser.parseAll(ruleParser.statementsExpr, rules) +// +// println(result) +// } + +// test("treat escape") { +// val es = """Hello\tworld\nmy name is \"ABC\"""" +// val un = StringContext treatEscapes es +// +// println(es) +// println(un) +// } + + test("test rule parser") { + val ruleParser = RuleParser() + +// val rules = "$SOUrce['tgt' < $source['tag' != 2] - -+-++---1] between ( -$target['32a'] + 9, 100, ----1000 ) and (45 > 9 or $target.type + 8 == 9 and $source['a'] >= 0) when not not not not $source._time + 24h < $target._time" + val rules = "$source['aaaf fd', (21, 43), '12']" + + val result = ruleParser.parseAll(ruleParser.selection, rules) + + println(result) + } + + test("test rule analyzer") { + val ruleParser = RuleParser() + +// val rules = "$source.tag == $target['take' >= 5] and $source.price + $source.price1 > $target['kk' < $target.age] and $source.ee = $target.fe + $target.a when $target.ggg = 1" + val rules = "$source.tag = $target.tag WHEN true" + val result = ruleParser.parseAll(ruleParser.rule, rules) + println(result) + + if (result.successful) { + val ruleAnalyzer = RuleAnalyzer(result.get) + + println("source") + ruleAnalyzer.sourceCacheExprs.foreach(a => println(a.desc)) + println("source final") + ruleAnalyzer.sourceFinalCacheExprs.foreach(a => println(a.desc)) + println("target") + ruleAnalyzer.targetCacheExprs.foreach(a => println(a.desc)) + println("target final") + ruleAnalyzer.targetFinalCacheExprs.foreach(a => println(a.desc)) + println("groupby") + ruleAnalyzer.sourceGroupbyExprs.foreach(println) + ruleAnalyzer.targetGroupbyExprs.foreach(println) + } + + + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/pom.xml ---------------------------------------------------------------------- diff --git a/measure/pom.xml b/measure/pom.xml new file mode 100644 index 0000000..7ed7f22 --- /dev/null +++ b/measure/pom.xml @@ -0,0 +1,193 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.griffin</groupId> + <artifactId>measure</artifactId> + <version>0.1.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <modules> + <module>measure-batch</module> + </modules> + + <name>Apache Griffin :: Measures</name> + <url>http://maven.apache.org</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + + <scala.version>2.10.6</scala.version> + <spark.version>1.6.0</spark.version> + <scala.binary.version>2.10</scala.binary.version> + + <avro.version>1.7.7</avro.version> + <jackson.version>2.8.7</jackson.version> + <scalaj.version>2.3.0</scalaj.version> + <junit.version>4.11</junit.version> + <scalatest.version>2.2.4</scalatest.version> + <slf4j.version>1.7.21</slf4j.version> + <log4j.version>1.2.16</log4j.version> + </properties> + + <dependencies> + <!--scala--> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + + <!--spark, spark streaming, spark hive--> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> + + <!--jackson--> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.module</groupId> + <artifactId>jackson-module-scala_${scala.binary.version}</artifactId> + <version>${jackson.version}</version> + </dependency> + + <!--scalaj for http request--> + <dependency> + <groupId>org.scalaj</groupId> + <artifactId>scalaj-http_${scala.binary.version}</artifactId> + <version>${scalaj.version}</version> + </dependency> + + <!--avro--> + <dependency> + <groupId>com.databricks</groupId> + <artifactId>spark-avro_${scala.binary.version}</artifactId> + <version>2.0.1</version> + </dependency> + <!--csv--> + <dependency> + <groupId>com.databricks</groupId> + <artifactId>spark-csv_${scala.binary.version}</artifactId> + <version>1.5.0</version> + </dependency> + + <!--log4j--> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + <!--<dependency>--> + <!--<groupId>org.slf4j</groupId>--> + <!--<artifactId>slf4j-simple</artifactId>--> + <!--<version>${slf4j.version}</version>--> + <!--</dependency>--> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>${log4j.version}</version> + </dependency> + + <!--junit--> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + + <!--scala test--> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <version>${scalatest.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <!-- or whatever version you use --> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.5.1</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <!--<plugin>--> + <!--<artifactId>maven-assembly-plugin</artifactId>--> + <!--<configuration>--> + <!--<descriptorRefs>--> + <!--<descriptorRef>jar-with-dependencies</descriptorRef>--> + <!--</descriptorRefs>--> + <!--</configuration>--> + <!--<executions>--> + <!--<execution>--> + <!--<id>make-assembly</id>--> + <!--<phase>package</phase>--> + <!--<goals>--> + <!--<goal>single</goal>--> + <!--</goals>--> + <!--</execution>--> + <!--</executions>--> + <!--</plugin>--> + </plugins> + </build> +</project> \ No newline at end of file
