Hi, The sinkRecords method is not implemented in these kinds of sinks. You should notice that there are two kinds of sink methods in trait Sink, sinkRecords method persist the mismatched records of original data set, which is in large size, and sinkMetrics method persist the metric of a measurement, which is only a simple json object. Therefore, all the kinds of sinks implemented the sinkMetrics method, but only the HdfsSink class could be able to persist the records. That’s why for accuracy measure, the missing records are only persist in Hdfs, not in the other sink ways.
Thanks Lionel, Liu From: 万昆 Sent: 2019年8月20日 12:30 To: [email protected] Subject: what is the measure sink recoreds method used for Hi,All : I don't know the sinkRecords method is used or not? Can anyone help me? Thanks In ElasticSearchSink def sinkRecords(records: RDD[String], name: String): Unit = {} def sinkRecords(records: Iterable[String], name: String): Unit = {} In MongoSink def sinkRecords(records: RDD[String], name: String): Unit = {} def sinkRecords(records: Iterable[String], name: String): Unit = {} In ConsoleSink def sinkRecords(records: RDD[String], name: String): Unit = { // println(s"${metricName} [${timeStamp}] records: ") // try { // val recordCount = records.count // val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) // val maxCount = count.toInt // if (maxCount > 0) { // val recordsArray = records.take(maxCount) // recordsArray.foreach(println) // } // } catch { // case e: Throwable => error(e.getMessage) // } } def sinkRecords(records: Iterable[String], name: String): Unit = { // println(s"${metricName} [${timeStamp}] records: ") // try { // val recordCount = records.size // val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) // if (count > 0) { // records.foreach(println) // } // } catch { // case e: Throwable => error(e.getMessage) // } }
