Hi,All :
    I don't know  the sinkRecords method is used or not?
Can anyone help me?
Thanks
In ElasticSearchSink
def sinkRecords(records: RDD[String], name: String): Unit = {}
def sinkRecords(records: Iterable[String], name: String): Unit = {}
In MongoSink
def sinkRecords(records: RDD[String], name: String): Unit = {}
def sinkRecords(records: Iterable[String], name: String): Unit = {}
In ConsoleSink
def sinkRecords(records: RDD[String], name: String): Unit = {
//    println(s"${metricName} [${timeStamp}] records: ")
//    try {
//      val recordCount = records.count
//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
//      val maxCount = count.toInt
//      if (maxCount > 0) {
//        val recordsArray = records.take(maxCount)
//        recordsArray.foreach(println)
//      }
//    } catch {
//      case e: Throwable => error(e.getMessage)
//    }
}

def sinkRecords(records: Iterable[String], name: String): Unit = {
//    println(s"${metricName} [${timeStamp}] records: ")
//    try {
//      val recordCount = records.size
//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
//      if (count > 0) {
//        records.foreach(println)
//      }
//    } catch {
//      case e: Throwable => error(e.getMessage)
//    }
}





Reply via email to