Hi, I am working on getting elastic search metrics(bytes_sent, bytes_accepted and bytes_retries) whenever we read/write data from elastic search in spark job. 

We tried to use spark listener, but still did get 0 byte by creating a listener class in the following attachment.
package com.moesif.spark

import org.apache.log4j.Logger
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, 
SparkListenerJobEnd, SparkListenerStageCompleted, SparkListenerTaskEnd}

import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}

/**
  * Spark listener class to handle the Spark events
  */
class SparkMetricsListener extends SparkListener {

   val log = Logger.getLogger(getClass.getName)

  val jobsCompleted   = new AtomicInteger(0)
  val stagesCompleted = new AtomicInteger(0)
  val tasksCompleted = new AtomicInteger(0)
  val executorRuntime = new AtomicLong(0L)
  val recordsRead     = new AtomicLong(0L)
  val recordsWritten  = new AtomicLong(0L)
  val bytesRead       = new AtomicLong(0L)
  val bytesWritten    = new AtomicLong(0L)
  val resultSize     = new AtomicLong(0L)


  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): 
Unit = {
    log.info("***************** Aggregate metrics 
*****************************")
    log.info(s"* Jobs = ${jobsCompleted.get()}, Stages = 
${stagesCompleted.get()}, Tasks = ${tasksCompleted}")
    log.info(s"* Executor runtime = ${executorRuntime.get()}ms, Records Read = 
${recordsRead.get()}, Records written = ${recordsWritten.get()}")
    log.info(s"* bytesRead = ${bytesRead.get()} bytes, bytesWrite = 
${bytesWritten.get()} bytes")
    log.info(s"* result size = ${resultSize.get()}")
    
log.info("*****************************************************************")
  }

  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
    jobsCompleted.incrementAndGet()
  }

  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): 
Unit =
    stagesCompleted.incrementAndGet()

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    tasksCompleted.incrementAndGet()
    executorRuntime.addAndGet(taskEnd.taskMetrics.executorRunTime)
    recordsRead.addAndGet(taskEnd.taskMetrics.inputMetrics.recordsRead)
    recordsWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.recordsWritten)
    bytesRead.addAndGet(taskEnd.taskMetrics.inputMetrics.bytesRead)
    bytesWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.bytesWritten)
    resultSize.addAndGet(taskEnd.taskMetrics.resultSize)

    taskEnd.taskInfo.accumulables.foreach(a => println("** " + a.name.get + ": 
" + a.value.get))

    println("taskEnd.taskMetrics.outputMetrics.bytesWritten       " + 
taskEnd.taskMetrics.shuffleWriteMetrics.bytesWritten)
  }
}
We. Also tried to use the recordsRead, recordsWritten, bytesRead and bytesWritten from each task to monitoring the data between rdd and elastic search. But got 0 after sending data to elastic search and calling the listener in spark job
import org.apache.hadoop.mapred.Counters
import org.apache.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
import org.elasticsearch.hadoop.mr.Counter

object MySparkJob {
  def main(args: Array[String]) {

    val counters:Counters = new Counters()
    println(counters..getCounter(Counter.BYTES_SENT))

    val conf = new SparkConf().setAppName(getClass.getName)
      .setMaster("local")
      .set("spark.metrics.namespace", "${spark.app.name}")
      .set("spark.network.timeout", "600s")
      .set("es.nodes", "0.0.0.0")
      .set("es.port", "9200")
      .set("es.http.retries", "10")
      .set("es.http.timeout", "3m")
      .set("es.nodes.wan.only", "true")
      .set("es.index.read.missing.as.empty", "yes")
      .set("es.write.operation", "upsert")
      .set("es.batch.write.refresh", "false")
      .set("spark.metrics.conf.*.sink.console.class", 
"org.apache.spark.metrics.sink.ConsoleSink")

    val sc = new SparkContext(conf)

    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

    val mySaveRDD = sc.makeRDD(
      Seq(numbers, airports)
    )
    // mySaveRDD.saveToEs("spark/docs/1112")

    val myLoadRDD = sc.esRDD("spark/docs")
    val result: Array[(String, collection.Map[String, AnyRef])] = 
myLoadRDD.collect()
    println("End")
  }
}
```
recordsRead.addAndGet(taskEnd.taskMetrics.inputMetrics.recordsRead)
recordsWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.recordsWritten)
bytesRead.addAndGet(taskEnd.taskMetrics.inputMetrics.bytesRead)
bytesWritten.addAndGet(taskEnd.taskMetrics.outputMetrics.bytesWritten)
```

We also tried to get the executor metrics (more details https://spark.apache.org/docs/latest/monitoring.html#executor-metrics), but the route http://localhost:4040/api/v1/applications/[app-id]/executors, does not work for us with the following setting.

.set("spark.ui.prometheus.enabled", "true")
.set("spark.eventLog.logStageExecutorMetrics", "true")

Can I get any suggestion or some examples for how to get the metrics correctly. 
Thank you for any help in advance.

Reply via email to