http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala new file mode 100644 index 0000000..c29c072 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala @@ -0,0 +1,115 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.write + +import org.apache.griffin.measure.configuration.enums._ +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.utils.JsonUtil +import org.apache.griffin.measure.utils.ParamUtil._ + +/** + * write metrics into context metric wrapper + */ +case class MetricWriteStep(name: String, + inputName: String, + collectType: NormalizeType, + writeTimestampOpt: Option[Long] = None + ) extends WriteStep { + + val emptyMetricMap = Map[Long, Map[String, Any]]() + val emptyMap = Map[String, Any]() + + def execute(context: DQContext): Boolean = { + val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp) + + // get metric list from data frame + val metricMaps: Seq[Map[String, Any]] = getMetricMaps(context) + + // get timestamp and normalize metric + val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode) + val timestampMetricMap: Map[Long, Map[String, Any]] = writeMode match { + case SimpleMode => { + println(metricMaps) + val metrics: Map[String, Any] = normalizeMetric(metricMaps, name, collectType) + emptyMetricMap + (timestamp -> metrics) + } + case TimestampMode => { + val tmstMetrics = metricMaps.map { metric => + val tmst = metric.getLong(ConstantColumns.tmst, timestamp) + val pureMetric = metric.removeKeys(ConstantColumns.columns) + (tmst, pureMetric) + } + tmstMetrics.groupBy(_._1).map { pair => + val (k, v) = pair + val maps = v.map(_._2) + val mtc = normalizeMetric(maps, name, collectType) + (k, mtc) + } + } + } + + // write to metric wrapper + timestampMetricMap.foreach { pair => + val (timestamp, v) = pair + context.metricWrapper.insertMetric(timestamp, v) + } + + true + } + + private def getMetricMaps(context: DQContext): Seq[Map[String, Any]] = { + try { + val pdf = context.sqlContext.table(s"`${inputName}`") + val records = pdf.toJSON.collect() + if (records.size > 0) { + records.flatMap { rec => + try { + val value = JsonUtil.toAnyMap(rec) + Some(value) + } catch { + case e: Throwable => None + } + }.toSeq + } else Nil + } catch { + case e: Throwable => { + error(s"get metric ${name} fails") + Nil + } + } + } + + private def normalizeMetric(metrics: Seq[Map[String, Any]], name: String, collectType: NormalizeType + ): Map[String, Any] = { + collectType match { + case EntriesNormalizeType => metrics.headOption.getOrElse(emptyMap) + case ArrayNormalizeType => Map[String, Any]((name -> metrics)) + case MapNormalizeType => { + val v = metrics.headOption.getOrElse(emptyMap) + Map[String, Any]((name -> v)) + } + case _ => { + if (metrics.size > 1) Map[String, Any]((name -> metrics)) + else metrics.headOption.getOrElse(emptyMap) + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala new file mode 100644 index 0000000..13b7f80 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala @@ -0,0 +1,151 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.write + +import org.apache.griffin.measure.configuration.enums._ +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.utils.JsonUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ + +/** + * write records needs to be persisted + */ +case class RecordWriteStep(name: String, + inputName: String, + filterTableNameOpt: Option[String] = None, + writeTimestampOpt: Option[Long] = None + ) extends WriteStep { + + def execute(context: DQContext): Boolean = { + val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp) + + val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode) + writeMode match { + case SimpleMode => { + // batch records + val recordsOpt = collectBatchRecords(context) + // write records + recordsOpt match { + case Some(records) => { + context.getPersist(timestamp).persistRecords(records, name) + } + case _ => {} + } + } + case TimestampMode => { + // streaming records + val (recordsOpt, emptyTimestamps) = collectStreamingRecords(context) + // write records + recordsOpt.foreach { records => + records.foreach { pair => + val (t, strs) = pair + context.getPersist(t).persistRecords(strs, name) + } + } + emptyTimestamps.foreach { t => + context.getPersist(t).persistRecords(Nil, name) + } + } + } + true + } + + private def getTmst(row: Row, defTmst: Long): Long = { + try { + row.getAs[Long](ConstantColumns.tmst) + } catch { + case _: Throwable => defTmst + } + } + + private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = { + try { + val df = context.sqlContext.table(s"`${name}`") + Some(df) + } catch { + case e: Throwable => { + error(s"get data frame ${name} fails") + None + } + } + } + + private def getRecordDataFrame(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName) + + private def getFilterTableDataFrame(context: DQContext): Option[DataFrame] = filterTableNameOpt.flatMap(getDataFrame(context, _)) + + private def collectBatchRecords(context: DQContext): Option[RDD[String]] = { + getRecordDataFrame(context).map(_.toJSON.rdd); + } + + private def collectStreamingRecords(context: DQContext): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = { + implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.STRING) + val defTimestamp = context.contextId.timestamp + getRecordDataFrame(context) match { + case Some(df) => { + val (filterFuncOpt, emptyTimestamps) = getFilterTableDataFrame(context) match { + case Some(filterDf) => { + // timestamps with empty flag + val tmsts: Array[(Long, Boolean)] = (filterDf.collect.flatMap { row => + try { + val tmst = getTmst(row, defTimestamp) + val empty = row.getAs[Boolean](ConstantColumns.empty) + Some((tmst, empty)) + } catch { + case _: Throwable => None + } + }) + val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet + val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet + val filterFuncOpt: Option[(Long) => Boolean] = if (recordTmsts.size > 0) { + Some((t: Long) => recordTmsts.contains(t)) + } else None + + (filterFuncOpt, emptyTmsts) + } + case _ => (Some((t: Long) => true), Set[Long]()) + } + + // filter timestamps need to record + filterFuncOpt match { + case Some(filterFunc) => { + val records = df.flatMap { row => + val tmst = getTmst(row, defTimestamp) + if (filterFunc(tmst)) { + try { + val map = SparkRowFormatter.formatRow(row) + val str = JsonUtil.toJson(map) + Some((tmst, str)) + } catch { + case e: Throwable => None + } + } else None + } + (Some(records.rdd.groupByKey), emptyTimestamps) + } + case _ => (None, emptyTimestamps) + } + } + case _ => (None, Set[Long]()) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala new file mode 100644 index 0000000..592c1d4 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala @@ -0,0 +1,65 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.write + +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{ArrayType, DataType, StructField, StructType} + +import scala.collection.mutable.ArrayBuffer + +/** + * spark row formatter + */ +object SparkRowFormatter { + + def formatRow(row: Row): Map[String, Any] = { + formatRowWithSchema(row, row.schema) + } + + private def formatRowWithSchema(row: Row, schema: StructType): Map[String, Any] = { + formatStruct(schema.fields, row) + } + + private def formatStruct(schema: Seq[StructField], r: Row) = { + val paired = schema.zip(r.toSeq) + paired.foldLeft(Map[String, Any]())((s, p) => s ++ formatItem(p)) + } + + private def formatItem(p: Tuple2[StructField, Any]): Map[String, Any] = { + p match { + case (sf, a) => + sf.dataType match { + case ArrayType(et, _) => + Map(sf.name -> (if (a == null) a else formatArray(et, a.asInstanceOf[ArrayBuffer[Any]]))) + case StructType(s) => + Map(sf.name -> (if (a == null) a else formatStruct(s, a.asInstanceOf[Row]))) + case _ => Map(sf.name -> a) + } + } + } + + private def formatArray(et: DataType, arr: ArrayBuffer[Any]): Seq[Any] = { + et match { + case StructType(s) => arr.map(e => formatStruct(s, e.asInstanceOf[Row])) + case ArrayType(t, _) => + arr.map(e => formatArray(t, e.asInstanceOf[ArrayBuffer[Any]])) + case _ => arr + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/write/WriteStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/WriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/WriteStep.scala new file mode 100644 index 0000000..329975c --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/WriteStep.scala @@ -0,0 +1,31 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.write + +import org.apache.griffin.measure.step.DQStep + +trait WriteStep extends DQStep { + + val inputName: String + + val writeTimestampOpt: Option[Long] + + override def getNames(): Seq[String] = Nil + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java b/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java deleted file mode 100644 index 6ab951b..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java +++ /dev/null @@ -1,74 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.griffin.measure.util; - - -import org.apache.griffin.measure.config.params.env.EmailParam; - -import javax.mail.*; -import javax.mail.internet.InternetAddress; -import javax.mail.internet.MimeBodyPart; -import javax.mail.internet.MimeMessage; -import javax.mail.internet.MimeMultipart; -import java.util.Properties; - -/** - * Created by xiaoqiu.duan on 2017/9/11. - */ -public class MailUtil { - - public static void SendMail(String name, String UserArr, String Title, String Info, EmailParam emailParam) throws MessagingException { - Properties prop = new Properties(); - prop.setProperty("mail.transport.protocol", "smtp"); - prop.setProperty("mail.smtp.host", emailParam.host()); - prop.setProperty("mail.smtp.auth", "NTLM"); - prop.setProperty("mail.debug", "true"); - Session session = Session.getInstance(prop); - Message msg = new MimeMessage(session); - msg.setFrom(new InternetAddress(emailParam.mail())); - //msg.setRecipient(Message.RecipientType.TO, new InternetAddress(UserArr)); - String[] arr = null; - if (!UserArr.contains(",")) { - arr = new String[]{UserArr}; - } else { - arr = UserArr.split(","); - } - Address[] tos = new InternetAddress[arr.length]; - for (int i = 0; i < arr.length; i++) { - tos[i] = new InternetAddress(arr[i]); - } - msg.setRecipients(Message.RecipientType.TO, tos); - msg.setSubject(Title); - Multipart mainPart = new MimeMultipart(); - BodyPart html = new MimeBodyPart(); - html.setContent(Info, "text/html; charset=utf-8"); - mainPart.addBodyPart(html); - msg.setContent(mainPart); - msg.addHeader("X-Priority", "3"); - msg.addHeader("X-MSMail-Priority", "Normal"); - msg.addHeader("X-Mailer", "Microsoft Outlook Express 6.00.2900.2869"); - msg.addHeader("X-MimeOLE", "Produced By Microsoft MimeOLE V6.00.2900.2869"); - msg.addHeader("ReturnReceipt", "1"); - Transport trans = session.getTransport(); - trans.connect(emailParam.usr(), emailParam.pwd()); - trans.sendMessage(msg, msg.getAllRecipients()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/util/Md5Util.java ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/util/Md5Util.java b/measure/src/main/scala/org/apache/griffin/measure/util/Md5Util.java deleted file mode 100644 index 0782510..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/util/Md5Util.java +++ /dev/null @@ -1,56 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.griffin.measure.util; - -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; - -public class Md5Util { - - private static final char[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', - 'e', 'f' }; - - private Md5Util() { - } - - public static String md5(String s) { - - try { - byte[] bytes = s.getBytes(); - - MessageDigest messageDigest = MessageDigest.getInstance("MD5"); - messageDigest.update(bytes); - byte[] mdBytes = messageDigest.digest(); - int j = mdBytes.length; - char[] str = new char[j * 2]; - int k = 0; - for (int i = 0; i < j; i++) { - byte byte0 = mdBytes[i]; - str[k++] = HEX_DIGITS[byte0 >>> 4 & 0xf]; - str[k++] = HEX_DIGITS[byte0 & 0xf]; - } - return new String(str); - } catch (NoSuchAlgorithmException ex) { - - return null; - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java b/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java deleted file mode 100644 index 4f15b62..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java +++ /dev/null @@ -1,178 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -//package org.apache.griffin.measure.util; -// -//import java.io.IOException; -// -//import org.apache.commons.lang.StringUtils; -//import java.util.ArrayList; -//import java.io.*; -//import java.io.BufferedReader; -//import java.io.InputStreamReader; -//import java.net.*; -//import java.util.*; -//import java.util.HashMap; -//import java.util.List; -//import java.util.Map; -// -// -//import org.apache.griffin.measure.config.params.env.SMSParam; -//import org.json.JSONException; -//import org.json.JSONObject; -// -///** -// * Created by xiaoqiu.duan on 2017/9/11. -// */ -//public class MessageUtil { -// -// -// public static String sendSMSCode(String teamPhone, String content, SMSParam smsParam){ -// String url=smsParam.host(); -// String SYS_ID=smsParam.id(); -// String KEY=smsParam.key(); -// String sendContext="["+smsParam.uuid()+"]: "+content ; -// System.out.println(" sendContext: "+sendContext); -// Long timestamp=new Date().getTime()+20610; -// System.out.println(" timestamp: "+timestamp); -// String[] tels=teamPhone.split(",") ; -// String uuid = UUID.randomUUID().toString().replaceAll("-", ""); -// Map<String,Object> param=new HashMap<String, Object>(); -// param.put("apportionment","JR_DATA_ALARM"); -// param.put("event",0); -// param.put("eventTime",0); -// param.put("isMasked",false); -// param.put("originator","ETC"); -// param.put("reqId",uuid); -// param.put("schemaKey",""); -// param.put("sysId",SYS_ID); -// param.put("template",sendContext); -// param.put("templateId",""); -// param.put("timestamp",timestamp); -// param.put("token",Md5Util.md5(SYS_ID+timestamp+KEY)); -// param.put("typeCode","JR_DATA_ALARM"); -// System.out.println("params: "+param); -// List<Map<String, Object>> bodys = new ArrayList<Map<String, Object>>(); -// for (int i=0;i<tels.length;i++) { -// Map<String, Object> body = new HashMap<String, Object>(); -// body.put("phoneNo", tels[i]); -// body.put("params", ""); -// body.put("userId", 0); -// bodys.add(body); -// } -// System.out.println("bodys: "+bodys); -// JSONObject jsonParam = new JSONObject(); -// try { -// jsonParam.put("params",param); -// jsonParam.put("bodys",bodys); -// System.out.println("jsonParam: "+jsonParam); -// System.out.println("jsonParam: "+jsonParam.toString()); -// } catch (JSONException e) { -// e.printStackTrace(); -// } -// URL u=null; -// int smsnum=0; -// HttpURLConnection connection=null; -// try{ -// String result= postRequestUrl(url, jsonParam.toString(), "utf-8"); -// return "send success"; -// }catch(Exception e){ -// e.printStackTrace(); -// return null; -// } -// -// } -// -// -// public static String postRequestUrl(String url, String param,String encode) { -// OutputStreamWriter out = null; -// BufferedReader reader = null; -// String response=""; -// try { -// URL httpUrl = null; -// httpUrl = new URL(url); -// HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection(); -// conn.setRequestMethod("POST"); -// conn.setRequestProperty("Content-Type", "application/json");//x-www-form-urlencoded -// conn.setRequestProperty("connection", "keep-alive"); -// conn.setUseCaches(false); -// conn.setInstanceFollowRedirects(true); -// conn.setDoOutput(true); -// conn.setDoInput(true); -// conn.connect(); -// //POST -// out = new OutputStreamWriter( -// conn.getOutputStream()); -// out.write(param); -// -// out.flush(); -// -// System.out.println("send POST "+conn.getResponseCode()); -// reader = new BufferedReader(new InputStreamReader( -// conn.getInputStream())); -// -// String lines; -// while ((lines = reader.readLine()) != null) { -// lines = new String(lines.getBytes(), "utf-8"); -// response+=lines; -// System.out.println("lines: "+lines); -// } -// reader.close(); -// conn.disconnect(); -// -// //log.info(response.toString()); -// } catch (Exception e) { -// System.out.println("send POST errorï¼"+e); -// e.printStackTrace(); -// } -// finally{ -// try{ -// if(out!=null){ -// out.close(); -// } -// if(reader!=null){ -// reader.close(); -// } -// } -// catch(IOException ex){ -// ex.printStackTrace(); -// } -// } -// -// return response; -// } -// -// -// -// private static String readBufferedContent(BufferedReader bufferedReader) { -// if (bufferedReader == null) -// return null; -// StringBuffer result = new StringBuffer(); -// String line = null; -// try { -// while (StringUtils.isNotBlank((line = bufferedReader.readLine()))) { -// result.append(line+"\r\n"); -// } -// } catch (IOException e) { -// e.printStackTrace(); -// return null; -// } -// return result.toString(); -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala index 9390160..1405c49 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala @@ -35,7 +35,7 @@ object DataFrameUtil { def unionByName(a: DataFrame, b: DataFrame): DataFrame = { val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq - a.select(columns: _*).unionAll(b.select(columns: _*)) + a.select(columns: _*).union(b.select(columns: _*)) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala new file mode 100644 index 0000000..023a138 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala @@ -0,0 +1,82 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.utils + +import java.io.File +import java.net.URI + +import org.apache.griffin.measure.Loggable +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem + +import scala.collection.mutable.{Map => MutableMap} + +object FSUtil extends Loggable { + + private val fsMap: MutableMap[String, FileSystem] = MutableMap() + private val defaultFS: FileSystem = FileSystem.get(getConfiguration) + + def getFileSystem(path: String): FileSystem = { + getUriOpt(path) match { + case Some(uri) => { + fsMap.get(uri.getScheme) match { + case Some(fs) => fs + case _ => { + val fs = try { + FileSystem.get(uri, getConfiguration) + } catch { + case e: Throwable => { + error(s"get file system error: ${e.getMessage}") + throw e + } + } + fsMap += (uri.getScheme -> fs) + fs + } + } + } + case _ => defaultFS + } + } + + private def getConfiguration(): Configuration = { + val conf = new Configuration() + conf.setBoolean("dfs.support.append", true) +// conf.set("fs.defaultFS", "hdfs://localhost") // debug in hdfs localhost env + conf + } + + private def getUriOpt(path: String): Option[URI] = { + val uriOpt = try { + Some(new URI(path)) + } catch { + case e: Throwable => None + } + uriOpt.flatMap { uri => + if (uri.getScheme == null) { + try { + Some(new File(path).toURI) + } catch { + case e: Throwable => None + } + } else Some(uri) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala deleted file mode 100644 index 8e0d9a3..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.utils - -import org.apache.spark.TaskContext -import org.apache.spark.rdd.RDD - -object HdfsFileDumpUtil { - - val sepCount = 50000 - - private def suffix(i: Long): String = { - if (i == 0) "" else s".${i}" - } - private def samePattern(fileName: String, patternFileName: String): Boolean = { - fileName.startsWith(patternFileName) - } - - def splitRdd[T](rdd: RDD[T])(implicit m: Manifest[T]): RDD[(Long, Iterable[T])] = { -// val indexRdd = rdd.zipWithIndex // slow process -// indexRdd.map(p => ((p._2 / sepCount), p._1)).groupByKey() // slow process - val count = rdd.count - val splitCount = count / sepCount + 1 - val splitRdd = rdd.mapPartitionsWithIndex { (n, itr) => - val idx = n % splitCount - itr.map((idx, _)) - } - splitRdd.groupByKey() - } - def splitIterable[T](datas: Iterable[T])(implicit m: Manifest[T]): Iterator[(Int, Iterable[T])] = { - val groupedData = datas.grouped(sepCount).zipWithIndex - groupedData.map(v => (v._2, v._1)) - } - - private def directDump(path: String, list: Iterable[String], lineSep: String): Unit = { - // collect and save - val strRecords = list.mkString(lineSep) - // save into hdfs - HdfsUtil.writeContent(path, strRecords) - } - - def dump(path: String, recordsRdd: RDD[String], lineSep: String): Unit = { - val groupedRdd = splitRdd(recordsRdd) - groupedRdd.foreach { pair => - val (idx, list) = pair - val filePath = path + suffix(idx) - directDump(filePath, list, lineSep) - } -// groupedRdd.aggregate(true)({ (res, pair) => -// val (idx, list) = pair -// val filePath = path + suffix(idx) -// directDump(filePath, list, lineSep) -// true -// }, _ && _) - } - def dump(path: String, records: Iterable[String], lineSep: String): Unit = { - val groupedRecords = splitIterable(records) - groupedRecords.foreach { pair => - val (idx, list) = pair - val filePath = path + suffix(idx) - directDump(filePath, list, lineSep) - } -// groupedRecords.aggregate(true)({ (res, pair) => -// val (idx, list) = pair -// val filePath = path + suffix(idx) -// directDump(filePath, list, lineSep) -// true -// }, _ && _) - } - - def remove(path: String, filename: String, withSuffix: Boolean): Unit = { - if (withSuffix) { - val files = HdfsUtil.listSubPathsByType(path, "file") - val patternFiles = files.filter(samePattern(_, filename)) - patternFiles.foreach { f => - val rmPath = HdfsUtil.getHdfsFilePath(path, f) - HdfsUtil.deleteHdfsPath(rmPath) - } - } else { - val rmPath = HdfsUtil.getHdfsFilePath(path, filename) - HdfsUtil.deleteHdfsPath(rmPath) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala index aa5643b..89f505a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala @@ -18,24 +18,19 @@ under the License. */ package org.apache.griffin.measure.utils -import org.apache.griffin.measure.log.Loggable -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} +import org.apache.griffin.measure.Loggable +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path} object HdfsUtil extends Loggable { private val seprator = "/" - private val conf = new Configuration() - conf.setBoolean("dfs.support.append", true) -// conf.set("fs.defaultFS", "hdfs://localhost") // debug @localhost - - private val dfs = FileSystem.get(conf) + private def getFS(implicit path: Path) = FSUtil.getFileSystem(path.toString) def existPath(filePath: String): Boolean = { try { - val path = new Path(filePath) - dfs.exists(path) + implicit val path = new Path(filePath) + getFS.exists(path) } catch { case e: Throwable => false } @@ -47,21 +42,21 @@ object HdfsUtil extends Loggable { } def createFile(filePath: String): FSDataOutputStream = { - val path = new Path(filePath) - if (dfs.exists(path)) dfs.delete(path, true) - return dfs.create(path) + implicit val path = new Path(filePath) + if (getFS.exists(path)) getFS.delete(path, true) + return getFS.create(path) } def appendOrCreateFile(filePath: String): FSDataOutputStream = { - val path = new Path(filePath) - if (dfs.getConf.getBoolean("dfs.support.append", false) && dfs.exists(path)) { - dfs.append(path) + implicit val path = new Path(filePath) + if (getFS.getConf.getBoolean("dfs.support.append", false) && getFS.exists(path)) { + getFS.append(path) } else createFile(filePath) } def openFile(filePath: String): FSDataInputStream = { - val path = new Path(filePath) - dfs.open(path) + implicit val path = new Path(filePath) + getFS.open(path) } def writeContent(filePath: String, message: String): Unit = { @@ -88,35 +83,35 @@ object HdfsUtil extends Loggable { def deleteHdfsPath(dirPath: String): Unit = { try { - val path = new Path(dirPath) - if (dfs.exists(path)) dfs.delete(path, true) + implicit val path = new Path(dirPath) + if (getFS.exists(path)) getFS.delete(path, true) } catch { case e: Throwable => error(s"delete path [${dirPath}] error: ${e.getMessage}") } } -// def listPathFiles(dirPath: String): Iterable[String] = { -// val path = new Path(dirPath) -// try { -// val fileStatusArray = dfs.listStatus(path) -// fileStatusArray.flatMap { fileStatus => -// if (fileStatus.isFile) { -// Some(fileStatus.getPath.getName) -// } else None -// } -// } catch { -// case e: Throwable => { -// println(s"list path files error: ${e.getMessage}") -// Nil -// } -// } -// } + // def listPathFiles(dirPath: String): Iterable[String] = { + // val path = new Path(dirPath) + // try { + // val fileStatusArray = dfs.listStatus(path) + // fileStatusArray.flatMap { fileStatus => + // if (fileStatus.isFile) { + // Some(fileStatus.getPath.getName) + // } else None + // } + // } catch { + // case e: Throwable => { + // println(s"list path files error: ${e.getMessage}") + // Nil + // } + // } + // } def listSubPathsByType(dirPath: String, subType: String, fullPath: Boolean = false): Iterable[String] = { if (existPath(dirPath)) { try { - val path = new Path(dirPath) - val fileStatusArray = dfs.listStatus(path) + implicit val path = new Path(dirPath) + val fileStatusArray = getFS.listStatus(path) fileStatusArray.filter { fileStatus => subType match { case "dir" => fileStatus.isDirectory http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala index d125d87..3f05499 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala @@ -47,6 +47,18 @@ object ParamUtil { } } + def getLazyString(key: String, defValue: () => String): String = { + try { + params.get(key) match { + case Some(v: String) => v.toString + case Some(v) => v.toString + case _ => defValue() + } + } catch { + case _: Throwable => defValue() + } + } + def getStringOrKey(key: String): String = getString(key, key) def getByte(key: String, defValue: Byte): Byte = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala index 9b4d58e..e96cbb1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala @@ -18,7 +18,7 @@ under the License. */ package org.apache.griffin.measure.utils -import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.Loggable import scala.util.matching.Regex import scala.util.{Failure, Success, Try} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_accuracy-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json b/measure/src/test/resources/_accuracy-batch-griffindsl.json deleted file mode 100644 index 10167cd..0000000 --- a/measure/src/test/resources/_accuracy-batch-griffindsl.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "name": "accu_batch", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - } - } - ] - }, { - "name": "target", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_target.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "accuracy", - "name": "accu", - "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code", - "details": { - "source": "source", - "target": "target", - "miss": "miss_count", - "total": "total_count", - "matched": "matched_count" - }, - "metric": { - "name": "accu" - }, - "record": { - "name": "missRecords" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_accuracy-batch-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-batch-sparksql.json b/measure/src/test/resources/_accuracy-batch-sparksql.json deleted file mode 100644 index 2eef9f1..0000000 --- a/measure/src/test/resources/_accuracy-batch-sparksql.json +++ /dev/null @@ -1,63 +0,0 @@ -{ - "name": "accu_batch", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - } - } - ] - }, { - "name": "target", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_target.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "missRecords", - "rule": "SELECT source.* FROM source LEFT JOIN target ON coalesce(source.user_id, '') = coalesce(target.user_id, '') AND coalesce(source.first_name, '') = coalesce(target.first_name, '') AND coalesce(source.post_code, '') = coalesce(target.post_code, '') WHERE (NOT (source.user_id IS NULL AND source.first_name IS NULL AND source.post_code IS NULL)) AND (target.user_id IS NULL AND target.first_name IS NULL AND target.post_code IS NULL)", - "record": { - "name": "miss" - } - }, - { - "dsl.type": "spark-sql", - "name": "miss_count", - "rule": "SELECT count(*) as miss FROM `missRecords`" - }, - { - "dsl.type": "spark-sql", - "name": "total_count", - "rule": "SELECT count(*) as total FROM source" - }, - { - "dsl.type": "spark-sql", - "name": "accu", - "rule": "SELECT `total_count`.`total` AS `total`, coalesce(`miss_count`.`miss`, 0) AS `miss`, (`total` - `miss`) AS `matched` FROM `total_count` FULL JOIN `miss_count`", - "metric": { - "name": "accu" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_accuracy-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json b/measure/src/test/resources/_accuracy-streaming-griffindsl.json deleted file mode 100644 index 240d768..0000000 --- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json +++ /dev/null @@ -1,121 +0,0 @@ -{ - "name": "accu_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.147.177.107:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "sss", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "type": "parquet", - "file.path": "hdfs://localhost/griffin/streaming/dump/source", - "info.path": "source", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["-2m", "0"], - "init.clear": true, - "updatable": true - } - }, { - "name": "target", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.147.177.107:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "ttt", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${t1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${t1}" - } - ] - } - ], - "cache": { - "type": "parquet", - "file.path": "hdfs://localhost/griffin/streaming/dump/target", - "info.path": "target", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["-2m", "0"], - "init.clear": true - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "accuracy", - "name": "accu", - "rule": "source.name = target.name and source.age = target.age", - "details": { - "source": "source", - "target": "target", - "miss": "miss_count", - "total": "total_count", - "matched": "matched_count" - }, - "metric": { - "name": "accu" - }, - "record": { - "name": "missRecords" - } - } - ] - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_accuracy-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-streaming-sparksql.json b/measure/src/test/resources/_accuracy-streaming-sparksql.json deleted file mode 100644 index 0824cb8..0000000 --- a/measure/src/test/resources/_accuracy-streaming-sparksql.json +++ /dev/null @@ -1,142 +0,0 @@ -{ - "name": "accu_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "sss", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/source", - "info.path": "source", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["-2m", "0"] - } - }, { - "name": "target", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "ttt", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${t1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${t1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/target", - "info.path": "target", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["-2m", "0"] - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "missRecords", - "cache": true, - "rule": "SELECT source.* FROM source LEFT JOIN target ON coalesce(source.name, '') = coalesce(target.name, '') AND coalesce(source.age, '') = coalesce(target.age, '') WHERE (NOT (source.name IS NULL AND source.age IS NULL)) AND (target.name IS NULL AND target.age IS NULL)" - }, - { - "dsl.type": "spark-sql", - "name": "miss_count", - "rule": "SELECT `__tmst`, count(*) as miss FROM `missRecords` GROUP BY `__tmst`" - }, - { - "dsl.type": "spark-sql", - "name": "total_count", - "rule": "SELECT `__tmst`, count(*) as total FROM source GROUP BY `__tmst`" - }, - { - "dsl.type": "spark-sql", - "name": "accu", - "rule": "SELECT `total_count`.`__tmst` AS `__tmst`, `total_count`.`total` AS `total`, coalesce(`miss_count`.`miss`, 0) AS `miss` FROM `total_count` FULL JOIN `miss_count` ON `total_count`.`__tmst` = `miss_count`.`__tmst`" - }, - { - "dsl.type": "df-opr", - "name": "metric_accu", - "rule": "accuracy", - "details": { - "df.name": "accu", - "miss": "miss", - "total": "total", - "matched": "matched" - }, - "metric": { - "name": "accuracy" - } - }, - { - "dsl.type": "spark-sql", - "name": "accu_miss_records", - "rule": "SELECT `__tmst`, `__empty` FROM `metric_accu` WHERE `__record`", - "record": { - "name": "missRecords", - "data.source.cache": "source", - "origin.DF": "missRecords" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_completeness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json b/measure/src/test/resources/_completeness-batch-griffindsl.json deleted file mode 100644 index 9c00444..0000000 --- a/measure/src/test/resources/_completeness-batch-griffindsl.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "name": "comp_batch", - - "process.type": "batch", - - "timestamp": 123456, - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "completeness", - "name": "comp", - "rule": "email, post_code, first_name", - "metric": { - "name": "comp" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_completeness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json deleted file mode 100644 index ba8bdce..0000000 --- a/measure/src/test/resources/_completeness-streaming-griffindsl.json +++ /dev/null @@ -1,65 +0,0 @@ -{ - "name": "comp_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.147.177.107:9092", - "group.id": "source", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "test", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/source", - "info.path": "source", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["0", "0"], - "init.clear": true - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "completeness", - "name": "comp", - "rule": "name, age", - "metric": { - "name": "comp" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_distinctness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json b/measure/src/test/resources/_distinctness-batch-griffindsl.json deleted file mode 100644 index af0c91e..0000000 --- a/measure/src/test/resources/_distinctness-batch-griffindsl.json +++ /dev/null @@ -1,57 +0,0 @@ -{ - "name": "dist_batch", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - } - } - ] - }, - { - "name": "target", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "distinct", - "name": "dist", - "rule": "user_id", - "details": { - "source": "source", - "target": "target", - "total": "total", - "distinct": "distinct", - "dup": "dup", - "num": "num", - "duplication.array": "dup" - }, - "metric": { - "name": "distinct" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_distinctness-batch-griffindsl1.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl1.json b/measure/src/test/resources/_distinctness-batch-griffindsl1.json deleted file mode 100644 index 4d94d8e..0000000 --- a/measure/src/test/resources/_distinctness-batch-griffindsl1.json +++ /dev/null @@ -1,73 +0,0 @@ -{ - "name": "dist_batch", - - "process.type": "batch", - - "timestamp": 123456, - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/dupdata.avro" - }, - "pre.proc": [ - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${this}" - } - ] - } - ] - }, - { - "name": "target", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/dupdata.avro" - }, - "pre.proc": [ - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select DISTINCT name, age from ${this}" - } - ] - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "distinct", - "name": "dist", - "rule": "name", - "details": { - "source": "source", - "target": "target", - "total": "total", - "distinct": "distinct", - "dup": "dup", - "num": "num", - "duplication.array": "dup" - }, - "metric": { - "name": "distinct" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_distinctness-batch-griffindsl2.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl2.json b/measure/src/test/resources/_distinctness-batch-griffindsl2.json deleted file mode 100644 index 6a12719..0000000 --- a/measure/src/test/resources/_distinctness-batch-griffindsl2.json +++ /dev/null @@ -1,74 +0,0 @@ -{ - "name": "dist_batch", - - "process.type": "batch", - - "timestamp": 123456, - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/dupdata.avro" - }, - "pre.proc": [ - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${this}" - } - ] - } - ] - }, - { - "name": "target", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/dupdata.avro" - }, - "pre.proc": [ - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select DISTINCT name, age from ${this}" - } - ] - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "distinct", - "name": "dist", - "rule": "name, [age]", - "details": { - "source": "source", - "target": "target", - "total": "total", - "distinct": "distinct", - "dup": "dup", - "num": "num", - "duplication.array": "dup", - "record.enable": true - }, - "metric": { - "name": "distinct" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_distinctness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-streaming-griffindsl.json b/measure/src/test/resources/_distinctness-streaming-griffindsl.json deleted file mode 100644 index c36e7ba..0000000 --- a/measure/src/test/resources/_distinctness-streaming-griffindsl.json +++ /dev/null @@ -1,85 +0,0 @@ -{ - "name": "dist_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "new", - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/old", - "info.path": "new", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["0", "0"], - "read.only": true - } - }, - { - "name": "old", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "old", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "ttt", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/old", - "info.path": "old", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["-24h", "0"] - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "distinct", - "name": "dist", - "rule": "name, age", - "details": { - "source": "new", - "target": "old", - "total": "total", - "distinct": "distinct", - "dup": "dup", - "accu_dup": "accu_dup", - "num": "num", - "duplication.array": "dup" - }, - "metric": { - "name": "distinct" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_profiling-batch-griffindsl-hive.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json deleted file mode 100644 index 03b0405..0000000 --- a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json +++ /dev/null @@ -1,48 +0,0 @@ -{ - "name": "prof_batch", - - "process.type": "batch", - - "timestamp": 123456, - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "hive", - "version": "1.2", - "config": { - "database": "default", - "table.name": "s1" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "profiling", - "name": "prof", - "rule": "name, count(*) as cnt from source group by name", - "metric": { - "name": "name_group", - "collect.type": "array" - } - }, - { - "dsl.type": "griffin-dsl", - "dq.type": "profiling", - "name": "grp", - "rule": "age, count(*) as cnt from source group by age order by cnt", - "metric": { - "name": "age_group", - "collect.type": "array" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_profiling-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json b/measure/src/test/resources/_profiling-batch-griffindsl.json deleted file mode 100644 index ec082c4..0000000 --- a/measure/src/test/resources/_profiling-batch-griffindsl.json +++ /dev/null @@ -1,54 +0,0 @@ -{ - "name": "prof_batch", - - "process.type": "batch", - - "timestamp": 123456, - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - }, - "pre.proc": [ - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select reg_replace(email, '^([^@0-9]+)([0-9]+)@(dc)(?:\\\\.[^@]+)$', '$1@$3') as email, post_code from ${this}" - } - ] - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "profiling", - "name": "prof", - "rule": "email, count(*) as cnt from source group by email", - "metric": { - "name": "prof", - "collect.type": "array" - } - }, - { - "dsl.type": "griffin-dsl", - "dq.type": "profiling", - "name": "grp", - "rule": "source.post_code, count(*) as cnt from source group by source.post_code order by cnt desc", - "metric": { - "name": "post_group", - "collect.type": "array" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_profiling-batch-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-batch-sparksql.json b/measure/src/test/resources/_profiling-batch-sparksql.json deleted file mode 100644 index fdfd812..0000000 --- a/measure/src/test/resources/_profiling-batch-sparksql.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "name": "prof_batch", - - "process.type": "batch", - - "timestamp": 123456, - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "prof", - "rule": "select count(*) as `cnt`, count(distinct `post_code`) as `dis-cnt`, max(user_id) as `max` from source", - "metric": { - "name": "prof" - } - }, - { - "dsl.type": "spark-sql", - "name": "grp", - "rule": "select post_code as `pc`, count(*) as `cnt` from source group by post_code", - "metric": { - "name": "post_group", - "collect.type": "array" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_profiling-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json b/measure/src/test/resources/_profiling-streaming-griffindsl.json deleted file mode 100644 index b6feb5a..0000000 --- a/measure/src/test/resources/_profiling-streaming-griffindsl.json +++ /dev/null @@ -1,75 +0,0 @@ -{ - "name": "prof_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.147.177.107:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "test", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/source", - "info.path": "source", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["0", "0"], - "init.clear": true - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "profiling", - "name": "prof", - "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source", - "metric": { - "name": "prof" - } - }, - { - "dsl.type": "griffin-dsl", - "dq.type": "profiling", - "name": "grp", - "rule": "select name, count(*) as `cnt` from source group by name", - "metric": { - "name": "name_group", - "collect.type": "array" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_profiling-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-streaming-sparksql.json b/measure/src/test/resources/_profiling-streaming-sparksql.json deleted file mode 100644 index 4f0b0ee..0000000 --- a/measure/src/test/resources/_profiling-streaming-sparksql.json +++ /dev/null @@ -1,80 +0,0 @@ -{ - "name": "prof_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "sss", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/source", - "info.path": "source", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["0", "0"] - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "prof", - "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source", - "metric": { - "name": "prof" - } - }, - { - "dsl.type": "spark-sql", - "name": "grp", - "rule": "select name, count(*) as `cnt` from source group by name", - "metric": { - "name": "name_group", - "collect.type": "array" - } - }, - { - "dsl.type": "spark-sql", - "name": "tmst_grp", - "rule": "select `__tmst`, count(*) as `cnt` from source group by `__tmst`", - "metric": { - "name": "tmst_group" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_timeliness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json b/measure/src/test/resources/_timeliness-batch-griffindsl.json deleted file mode 100644 index 90439df..0000000 --- a/measure/src/test/resources/_timeliness-batch-griffindsl.json +++ /dev/null @@ -1,49 +0,0 @@ -{ - "name": "timeliness_batch", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/timeliness_data.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "timeliness", - "name": "timeliness", - "rule": "ts, end_ts", - "details": { - "source": "source", - "latency": "latency", - "total": "total", - "avg": "avg", - "threshold": "3m", - "step": "step", - "count": "cnt", - "step.size": "2m", - "percentile": "percentile", - "percentile.values": [0.95] - }, - "metric": { - "name": "timeliness" - }, - "record": { - "name": "lateRecords" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_timeliness-batch-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-batch-sparksql.json b/measure/src/test/resources/_timeliness-batch-sparksql.json deleted file mode 100644 index f9cb368..0000000 --- a/measure/src/test/resources/_timeliness-batch-sparksql.json +++ /dev/null @@ -1,52 +0,0 @@ -{ - "name": "timeliness_batch", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/timeliness_data.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "in_time", - "rule": "select *, (ts) as `_in_ts`, (end_ts) as `_out_ts` from source where (ts) IS NOT NULL" - }, - { - "dsl.type": "spark-sql", - "name": "lat", - "cache": true, - "rule": "select *, (`_out_ts` - `_in_ts`) as `latency` from `in_time`" - }, - { - "dsl.type": "spark-sql", - "name": "metric", - "rule": "select cast(avg(`latency`) as bigint) as `avg`, max(`latency`) as `max`, min(`latency`) as `min` from `lat`", - "metric": { - "name": "timeliness" - } - }, - { - "dsl.type": "spark-sql", - "name": "slows", - "rule": "select * from `lat` where `latency` > 60000", - "record": { - "name": "lateRecords" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_timeliness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json b/measure/src/test/resources/_timeliness-streaming-griffindsl.json deleted file mode 100644 index 5916e5c..0000000 --- a/measure/src/test/resources/_timeliness-streaming-griffindsl.json +++ /dev/null @@ -1,79 +0,0 @@ -{ - "name": "timeliness_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "fff", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select ts, end_ts, name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/source", - "info.path": "source", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["0", "0"] - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "timeliness", - "name": "timeliness", - "rule": "ts, end_ts", - "details": { - "source": "source", - "latency": "latency", - "total": "total", - "avg": "avg", - "threshold": "1h", - "step": "step", - "count": "cnt", - "step.size": "5m", - "percentile": "percentile", - "percentile.values": [0.2, 0.5, 0.8] - }, - "metric": { - "name": "timeliness" - }, - "record": { - "name": "lateRecords" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_timeliness-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-streaming-sparksql.json b/measure/src/test/resources/_timeliness-streaming-sparksql.json deleted file mode 100644 index dc736ab..0000000 --- a/measure/src/test/resources/_timeliness-streaming-sparksql.json +++ /dev/null @@ -1,82 +0,0 @@ -{ - "name": "timeliness_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "fff", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select ts, name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/source", - "info.path": "source", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["0", "0"] - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "in_time", - "rule": "select *, (ts) as `_in_ts` from source where (ts) IS NOT NULL" - }, - { - "dsl.type": "spark-sql", - "name": "lat", - "cache": true, - "rule": "select *, (`__tmst` - `_in_ts`) as `latency` from `in_time`" - }, - { - "dsl.type": "spark-sql", - "name": "metric", - "rule": "select `__tmst`, cast(avg(`latency`) as bigint) as `avg`, max(`latency`) as `max`, min(`latency`) as `min` from `lat`", - "metric": { - "name": "timeliness" - } - }, - { - "dsl.type": "spark-sql", - "name": "slows", - "rule": "select * from `lat` where `latency` > 60000", - "record": { - "name": "lateRecords" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_uniqueness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_uniqueness-batch-griffindsl.json b/measure/src/test/resources/_uniqueness-batch-griffindsl.json deleted file mode 100644 index 28009e8..0000000 --- a/measure/src/test/resources/_uniqueness-batch-griffindsl.json +++ /dev/null @@ -1,58 +0,0 @@ -{ - "name": "unique_batch", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - } - } - ] - }, - { - "name": "target", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "uniqueness", - "name": "dup", - "rule": "user_id", - "details": { - "source": "source", - "target": "target", - "total": "total", - "unique": "unique", - "dup": "dup", - "num": "num" - }, - "metric": { - "name": "unique" - }, - "record": { - "name": "dupRecords" - } - } - ] - } -} \ No newline at end of file
