Hi,

First, I need to say that, Griffin measures the data quality, which is always 
in metric format, so the metrics is required to be persist, that’s enough. For 
the mismatched records in accuracy, it’s just optional to persist, to help 
users get the gap of data sets easily. But in the big data world, the data sets 
are always in large size, like 1GB/day, if the accuracy is 90%, then the 
mismatched records would be 100MB/day, which is still too large for most of the 
sink ways. That’s why we just implemented the sinkRecords method for HdfsSink 
by default. 
1. It doesn’t mean that griffin strongly depends on Hdfs, it just helps to 
persist the mismatched records in accuracy cases, griffin can still work 
without the HdfsSink, if you just don’t need the mismatched records in 
accuracy. 
On the other hand, S3 could also work for this purpose, users can just 
implement the sink way they like. 
2. Furthermore, for the smaller-sized storages, users can also implement the 
sinkRecords method, to persist the sample records if they’d like to (just 
considering large size records persist is not good for the sink ways like ES or 
Mongo, etc).
3. The four metrics output types are just provided by default, they are not 
necessary, users can just enable some of them, or use the other ones 
implemented by themselves. There’s no special requirements, users need to 
consider the best ones for their specific environments. 
Griffin has only implemented four common types of sinks, it would be thankful 
if any other sinks are contributed.

Thanks
Lionel, Liu

From: 万昆
Sent: 2019年8月21日 10:07
To: dev@griffin.apache.org
Cc: d...@griffin.incubator.apache.org
Subject: Re:RE: what is the measure sink recoreds method used for



Thanks , Lionel .


1. If the result records can only be sink into hdfs filesystem, that means 
griffin has strong dependence on hdfs, and other system like s3 can not get the 
result records .
2. Could the metrics and records be sink into one components like ElasticSearch 
or Mongo ?
3. Why should we have four metrics output types ?  Is there any special 
requirements ? 


* flatten: Aggregation method used before sending data frame result into the 
sink:  
      - default: use "array" if data frame returned multiple records, otherwise 
use "entries" 
      - entries: sends first row of data frame as metric results, like like 
`{"agg_col": "value"}`
      - array: wraps all metrics into a map, like `{"my_out_name": [{"agg_col": 
"value"}]}`
      - map: wraps first row of data frame into a map, like `{"my_out_name": 
{"agg_col": "value"}}`







At 2019-08-20 21:24:37, "Lionel, Liu" <bhlx3l...@163.com> wrote:
>Hi,
>
>The sinkRecords method is not implemented in these kinds of sinks. You should 
>notice that there are two kinds of sink methods in trait Sink, sinkRecords 
>method persist the mismatched records of original data set, which is in large 
>size, and sinkMetrics method persist the metric of a measurement, which is 
>only a simple json object.
>Therefore, all the kinds of sinks implemented the sinkMetrics method, but only 
>the HdfsSink class could be able to persist the records. That’s why for 
>accuracy measure, the missing records are only persist in Hdfs, not in the 
>other sink ways.
>
>Thanks
>Lionel, Liu
>
>From: 万昆
>Sent: 2019年8月20日 12:30
>To: d...@griffin.incubator.apache.org
>Subject: what is the measure sink recoreds method used for
>
>Hi,All :
>    I don't know  the sinkRecords method is used or not?
>Can anyone help me?
>Thanks
>In ElasticSearchSink
>def sinkRecords(records: RDD[String], name: String): Unit = {}
>def sinkRecords(records: Iterable[String], name: String): Unit = {}
>In MongoSink
>def sinkRecords(records: RDD[String], name: String): Unit = {}
>def sinkRecords(records: Iterable[String], name: String): Unit = {}
>In ConsoleSink
>def sinkRecords(records: RDD[String], name: String): Unit = {
>//    println(s"${metricName} [${timeStamp}] records: ")
>//    try {
>//      val recordCount = records.count
>//      val count = if (maxLogLines < 0) recordCount else 
>scala.math.min(maxLogLines, recordCount)
>//      val maxCount = count.toInt
>//      if (maxCount > 0) {
>//        val recordsArray = records.take(maxCount)
>//        recordsArray.foreach(println)
>//      }
>//    } catch {
>//      case e: Throwable => error(e.getMessage)
>//    }
>}
>
>def sinkRecords(records: Iterable[String], name: String): Unit = {
>//    println(s"${metricName} [${timeStamp}] records: ")
>//    try {
>//      val recordCount = records.size
>//      val count = if (maxLogLines < 0) recordCount else 
>scala.math.min(maxLogLines, recordCount)
>//      if (count > 0) {
>//        records.foreach(println)
>//      }
>//    } catch {
>//      case e: Throwable => error(e.getMessage)
>//    }
>}
>
>
>
>
>
>

Reply via email to