Thanks , Lionel .
1. If the result records can only be sink into hdfs filesystem, that means griffin has strong dependence on hdfs, and other system like s3 can not get the result records . 2. Could the metrics and records be sink into one components like ElasticSearch or Mongo ? 3. Why should we have four metrics output types ? Is there any special requirements ? * flatten: Aggregation method used before sending data frame result into the sink: - default: use "array" if data frame returned multiple records, otherwise use "entries" - entries: sends first row of data frame as metric results, like like `{"agg_col": "value"}` - array: wraps all metrics into a map, like `{"my_out_name": [{"agg_col": "value"}]}` - map: wraps first row of data frame into a map, like `{"my_out_name": {"agg_col": "value"}}` At 2019-08-20 21:24:37, "Lionel, Liu" <bhlx3l...@163.com> wrote: >Hi, > >The sinkRecords method is not implemented in these kinds of sinks. You should >notice that there are two kinds of sink methods in trait Sink, sinkRecords >method persist the mismatched records of original data set, which is in large >size, and sinkMetrics method persist the metric of a measurement, which is >only a simple json object. >Therefore, all the kinds of sinks implemented the sinkMetrics method, but only >the HdfsSink class could be able to persist the records. That’s why for >accuracy measure, the missing records are only persist in Hdfs, not in the >other sink ways. > >Thanks >Lionel, Liu > >From: 万昆 >Sent: 2019年8月20日 12:30 >To: d...@griffin.incubator.apache.org >Subject: what is the measure sink recoreds method used for > >Hi,All : > I don't know the sinkRecords method is used or not? >Can anyone help me? >Thanks >In ElasticSearchSink >def sinkRecords(records: RDD[String], name: String): Unit = {} >def sinkRecords(records: Iterable[String], name: String): Unit = {} >In MongoSink >def sinkRecords(records: RDD[String], name: String): Unit = {} >def sinkRecords(records: Iterable[String], name: String): Unit = {} >In ConsoleSink >def sinkRecords(records: RDD[String], name: String): Unit = { >// println(s"${metricName} [${timeStamp}] records: ") >// try { >// val recordCount = records.count >// val count = if (maxLogLines < 0) recordCount else >scala.math.min(maxLogLines, recordCount) >// val maxCount = count.toInt >// if (maxCount > 0) { >// val recordsArray = records.take(maxCount) >// recordsArray.foreach(println) >// } >// } catch { >// case e: Throwable => error(e.getMessage) >// } >} > >def sinkRecords(records: Iterable[String], name: String): Unit = { >// println(s"${metricName} [${timeStamp}] records: ") >// try { >// val recordCount = records.size >// val count = if (maxLogLines < 0) recordCount else >scala.math.min(maxLogLines, recordCount) >// if (count > 0) { >// records.foreach(println) >// } >// } catch { >// case e: Throwable => error(e.getMessage) >// } >} > > > > > >