Thanks , Lionel .
1. If the result records can only be sink into hdfs filesystem, that means
griffin has strong dependence on hdfs, and other system like s3 can not get the
result records .
2. Could the metrics and records be sink into one components like ElasticSearch
or Mongo ?
3. Why should we have four metrics output types ? Is there any special
requirements ?
* flatten: Aggregation method used before sending data frame result into the
sink:
- default: use "array" if data frame returned multiple records, otherwise
use "entries"
- entries: sends first row of data frame as metric results, like like
`{"agg_col": "value"}`
- array: wraps all metrics into a map, like `{"my_out_name": [{"agg_col":
"value"}]}`
- map: wraps first row of data frame into a map, like `{"my_out_name":
{"agg_col": "value"}}`
At 2019-08-20 21:24:37, "Lionel, Liu" <[email protected]> wrote:
>Hi,
>
>The sinkRecords method is not implemented in these kinds of sinks. You should
>notice that there are two kinds of sink methods in trait Sink, sinkRecords
>method persist the mismatched records of original data set, which is in large
>size, and sinkMetrics method persist the metric of a measurement, which is
>only a simple json object.
>Therefore, all the kinds of sinks implemented the sinkMetrics method, but only
>the HdfsSink class could be able to persist the records. That’s why for
>accuracy measure, the missing records are only persist in Hdfs, not in the
>other sink ways.
>
>Thanks
>Lionel, Liu
>
>From: 万昆
>Sent: 2019年8月20日 12:30
>To: [email protected]
>Subject: what is the measure sink recoreds method used for
>
>Hi,All :
> I don't know the sinkRecords method is used or not?
>Can anyone help me?
>Thanks
>In ElasticSearchSink
>def sinkRecords(records: RDD[String], name: String): Unit = {}
>def sinkRecords(records: Iterable[String], name: String): Unit = {}
>In MongoSink
>def sinkRecords(records: RDD[String], name: String): Unit = {}
>def sinkRecords(records: Iterable[String], name: String): Unit = {}
>In ConsoleSink
>def sinkRecords(records: RDD[String], name: String): Unit = {
>// println(s"${metricName} [${timeStamp}] records: ")
>// try {
>// val recordCount = records.count
>// val count = if (maxLogLines < 0) recordCount else
>scala.math.min(maxLogLines, recordCount)
>// val maxCount = count.toInt
>// if (maxCount > 0) {
>// val recordsArray = records.take(maxCount)
>// recordsArray.foreach(println)
>// }
>// } catch {
>// case e: Throwable => error(e.getMessage)
>// }
>}
>
>def sinkRecords(records: Iterable[String], name: String): Unit = {
>// println(s"${metricName} [${timeStamp}] records: ")
>// try {
>// val recordCount = records.size
>// val count = if (maxLogLines < 0) recordCount else
>scala.math.min(maxLogLines, recordCount)
>// if (count > 0) {
>// records.foreach(println)
>// }
>// } catch {
>// case e: Throwable => error(e.getMessage)
>// }
>}
>
>
>
>
>
>