Thanks , Lionel .


1. If the result records can only be sink into hdfs filesystem, that means 
griffin has strong dependence on hdfs, and other system like s3 can not get the 
result records .
2. Could the metrics and records be sink into one components like ElasticSearch 
or Mongo ?
3. Why should we have four metrics output types ?  Is there any special 
requirements ? 


* flatten: Aggregation method used before sending data frame result into the 
sink:  
      - default: use "array" if data frame returned multiple records, otherwise 
use "entries" 
      - entries: sends first row of data frame as metric results, like like 
`{"agg_col": "value"}`
      - array: wraps all metrics into a map, like `{"my_out_name": [{"agg_col": 
"value"}]}`
      - map: wraps first row of data frame into a map, like `{"my_out_name": 
{"agg_col": "value"}}`







At 2019-08-20 21:24:37, "Lionel, Liu" <bhlx3l...@163.com> wrote:
>Hi,
>
>The sinkRecords method is not implemented in these kinds of sinks. You should 
>notice that there are two kinds of sink methods in trait Sink, sinkRecords 
>method persist the mismatched records of original data set, which is in large 
>size, and sinkMetrics method persist the metric of a measurement, which is 
>only a simple json object.
>Therefore, all the kinds of sinks implemented the sinkMetrics method, but only 
>the HdfsSink class could be able to persist the records. That’s why for 
>accuracy measure, the missing records are only persist in Hdfs, not in the 
>other sink ways.
>
>Thanks
>Lionel, Liu
>
>From: 万昆
>Sent: 2019年8月20日 12:30
>To: d...@griffin.incubator.apache.org
>Subject: what is the measure sink recoreds method used for
>
>Hi,All :
>    I don't know  the sinkRecords method is used or not?
>Can anyone help me?
>Thanks
>In ElasticSearchSink
>def sinkRecords(records: RDD[String], name: String): Unit = {}
>def sinkRecords(records: Iterable[String], name: String): Unit = {}
>In MongoSink
>def sinkRecords(records: RDD[String], name: String): Unit = {}
>def sinkRecords(records: Iterable[String], name: String): Unit = {}
>In ConsoleSink
>def sinkRecords(records: RDD[String], name: String): Unit = {
>//    println(s"${metricName} [${timeStamp}] records: ")
>//    try {
>//      val recordCount = records.count
>//      val count = if (maxLogLines < 0) recordCount else 
>scala.math.min(maxLogLines, recordCount)
>//      val maxCount = count.toInt
>//      if (maxCount > 0) {
>//        val recordsArray = records.take(maxCount)
>//        recordsArray.foreach(println)
>//      }
>//    } catch {
>//      case e: Throwable => error(e.getMessage)
>//    }
>}
>
>def sinkRecords(records: Iterable[String], name: String): Unit = {
>//    println(s"${metricName} [${timeStamp}] records: ")
>//    try {
>//      val recordCount = records.size
>//      val count = if (maxLogLines < 0) recordCount else 
>scala.math.min(maxLogLines, recordCount)
>//      if (count > 0) {
>//        records.foreach(println)
>//      }
>//    } catch {
>//      case e: Throwable => error(e.getMessage)
>//    }
>}
>
>
>
>
>
>

Reply via email to