Hi, I am working on getting elastic search metrics(bytes_sent, bytes_accepted and bytes_retries) whenever we read/write data from elastic search in spark job. We tried to use spark listener, but still did get 0 byte by creating a listener class in the following attachment. |
package com.moesif.spark import org.apache.log4j.Logger import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobEnd, SparkListenerStageCompleted, SparkListenerTaskEnd}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} /** * Spark listener class to handle the Spark events */ class SparkMetricsListener extends SparkListener { val log = Logger.getLogger(getClass.getName) val jobsCompleted = new AtomicInteger(0) val stagesCompleted = new AtomicInteger(0) val tasksCompleted = new AtomicInteger(0) val executorRuntime = new AtomicLong(0L) val recordsRead = new AtomicLong(0L) val recordsWritten = new AtomicLong(0L) val bytesRead = new AtomicLong(0L) val bytesWritten = new AtomicLong(0L) val resultSize = new AtomicLong(0L) override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { log.info("***************** Aggregate metrics *****************************") log.info(s"* Jobs = ${jobsCompleted.get()}, Stages = ${stagesCompleted.get()}, Tasks = ${tasksCompleted}") log.info(s"* Executor runtime = ${executorRuntime.get()}ms, Records Read = ${recordsRead.get()}, Records written = ${recordsWritten.get()}") log.info(s"* bytesRead = ${bytesRead.get()} bytes, bytesWrite = ${bytesWritten.get()} bytes") log.info(s"* result size = ${resultSize.get()}") log.info("*****************************************************************") } override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { jobsCompleted.incrementAndGet() } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = stagesCompleted.incrementAndGet() override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { tasksCompleted.incrementAndGet() executorRuntime.addAndGet(taskEnd.taskMetrics.executorRunTime) recordsRead.addAndGet(taskEnd.taskMetrics.inputMetrics.recordsRead) recordsWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.recordsWritten) bytesRead.addAndGet(taskEnd.taskMetrics.inputMetrics.bytesRead) bytesWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.bytesWritten) resultSize.addAndGet(taskEnd.taskMetrics.resultSize) taskEnd.taskInfo.accumulables.foreach(a => println("** " + a.name.get + ": " + a.value.get)) println("taskEnd.taskMetrics.outputMetrics.bytesWritten " + taskEnd.taskMetrics.shuffleWriteMetrics.bytesWritten) } }
We. Also tried to use the recordsRead, recordsWritten, bytesRead and bytesWritten from each task to monitoring the data between rdd and elastic search. But got 0 after sending data to elastic search and calling the listener in spark job |
import org.apache.hadoop.mapred.Counters import org.apache.spark import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.spark._ import org.elasticsearch.hadoop.mr.Counter object MySparkJob { def main(args: Array[String]) { val counters:Counters = new Counters() println(counters..getCounter(Counter.BYTES_SENT)) val conf = new SparkConf().setAppName(getClass.getName) .setMaster("local") .set("spark.metrics.namespace", "${spark.app.name}") .set("spark.network.timeout", "600s") .set("es.nodes", "0.0.0.0") .set("es.port", "9200") .set("es.http.retries", "10") .set("es.http.timeout", "3m") .set("es.nodes.wan.only", "true") .set("es.index.read.missing.as.empty", "yes") .set("es.write.operation", "upsert") .set("es.batch.write.refresh", "false") .set("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink") val sc = new SparkContext(conf) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") val mySaveRDD = sc.makeRDD( Seq(numbers, airports) ) // mySaveRDD.saveToEs("spark/docs/1112") val myLoadRDD = sc.esRDD("spark/docs") val result: Array[(String, collection.Map[String, AnyRef])] = myLoadRDD.collect() println("End") } }
``` recordsRead.addAndGet(taskEnd.taskMetrics.inputMetrics.recordsRead) ``` We also tried to get the executor metrics (more details https://spark.apache.org/docs/latest/monitoring.html#executor-metrics), but the route http://localhost:4040/api/v1/applications/[app-id]/executors, does not work for us with the following setting. .set("spark.ui.prometheus.enabled", "true") Can I get any suggestion or some examples for how to get the metrics correctly. Thank you for any help in advance. |