Hi, First, I need to say that, Griffin measures the data quality, which is always in metric format, so the metrics is required to be persist, that’s enough. For the mismatched records in accuracy, it’s just optional to persist, to help users get the gap of data sets easily. But in the big data world, the data sets are always in large size, like 1GB/day, if the accuracy is 90%, then the mismatched records would be 100MB/day, which is still too large for most of the sink ways. That’s why we just implemented the sinkRecords method for HdfsSink by default. 1. It doesn’t mean that griffin strongly depends on Hdfs, it just helps to persist the mismatched records in accuracy cases, griffin can still work without the HdfsSink, if you just don’t need the mismatched records in accuracy. On the other hand, S3 could also work for this purpose, users can just implement the sink way they like. 2. Furthermore, for the smaller-sized storages, users can also implement the sinkRecords method, to persist the sample records if they’d like to (just considering large size records persist is not good for the sink ways like ES or Mongo, etc). 3. The four metrics output types are just provided by default, they are not necessary, users can just enable some of them, or use the other ones implemented by themselves. There’s no special requirements, users need to consider the best ones for their specific environments. Griffin has only implemented four common types of sinks, it would be thankful if any other sinks are contributed.
Thanks Lionel, Liu From: 万昆 Sent: 2019年8月21日 10:07 To: dev@griffin.apache.org Cc: d...@griffin.incubator.apache.org Subject: Re:RE: what is the measure sink recoreds method used for Thanks , Lionel . 1. If the result records can only be sink into hdfs filesystem, that means griffin has strong dependence on hdfs, and other system like s3 can not get the result records . 2. Could the metrics and records be sink into one components like ElasticSearch or Mongo ? 3. Why should we have four metrics output types ? Is there any special requirements ? * flatten: Aggregation method used before sending data frame result into the sink: - default: use "array" if data frame returned multiple records, otherwise use "entries" - entries: sends first row of data frame as metric results, like like `{"agg_col": "value"}` - array: wraps all metrics into a map, like `{"my_out_name": [{"agg_col": "value"}]}` - map: wraps first row of data frame into a map, like `{"my_out_name": {"agg_col": "value"}}` At 2019-08-20 21:24:37, "Lionel, Liu" <bhlx3l...@163.com> wrote: >Hi, > >The sinkRecords method is not implemented in these kinds of sinks. You should >notice that there are two kinds of sink methods in trait Sink, sinkRecords >method persist the mismatched records of original data set, which is in large >size, and sinkMetrics method persist the metric of a measurement, which is >only a simple json object. >Therefore, all the kinds of sinks implemented the sinkMetrics method, but only >the HdfsSink class could be able to persist the records. That’s why for >accuracy measure, the missing records are only persist in Hdfs, not in the >other sink ways. > >Thanks >Lionel, Liu > >From: 万昆 >Sent: 2019年8月20日 12:30 >To: d...@griffin.incubator.apache.org >Subject: what is the measure sink recoreds method used for > >Hi,All : > I don't know the sinkRecords method is used or not? >Can anyone help me? >Thanks >In ElasticSearchSink >def sinkRecords(records: RDD[String], name: String): Unit = {} >def sinkRecords(records: Iterable[String], name: String): Unit = {} >In MongoSink >def sinkRecords(records: RDD[String], name: String): Unit = {} >def sinkRecords(records: Iterable[String], name: String): Unit = {} >In ConsoleSink >def sinkRecords(records: RDD[String], name: String): Unit = { >// println(s"${metricName} [${timeStamp}] records: ") >// try { >// val recordCount = records.count >// val count = if (maxLogLines < 0) recordCount else >scala.math.min(maxLogLines, recordCount) >// val maxCount = count.toInt >// if (maxCount > 0) { >// val recordsArray = records.take(maxCount) >// recordsArray.foreach(println) >// } >// } catch { >// case e: Throwable => error(e.getMessage) >// } >} > >def sinkRecords(records: Iterable[String], name: String): Unit = { >// println(s"${metricName} [${timeStamp}] records: ") >// try { >// val recordCount = records.size >// val count = if (maxLogLines < 0) recordCount else >scala.math.min(maxLogLines, recordCount) >// if (count > 0) { >// records.foreach(println) >// } >// } catch { >// case e: Throwable => error(e.getMessage) >// } >} > > > > > >