[ 
https://issues.apache.org/jira/browse/SPARK-22052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Taaffe updated SPARK-22052:
---------------------------------
    Comment: was deleted

(was: Please check the attached file "Processed Rows Per Second".
After altering the code the correct values are displayed in column B. 
They match the data from the INFO StreamExecution displayed during streaming)

> Incorrect Metric reported in MetricsReporter.scala
> --------------------------------------------------
>
>                 Key: SPARK-22052
>                 URL: https://issues.apache.org/jira/browse/SPARK-22052
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, Structured Streaming
>    Affects Versions: 2.2.0, 2.3.0
>         Environment: Spark 2.2 
> MetricsReporter.scala
>            Reporter: Jason Taaffe
>            Priority: Minor
>              Labels: easyfix
>         Attachments: Processed Rows Per Second.png
>
>
> The wrong metric is being sent in MetricsReporter.scala
> The current implementation for processingRate-total uses the wrong metric:
> Look at the first and second registerGauge. The second one mistakenly uses 
> inputRowsPerSecond instead of processedRowsPerSecond.
> {code:java}
> class MetricsReporter(
>     stream: StreamExecution,
>     override val sourceName: String) extends CodahaleSource with Logging {
>   override val metricRegistry: MetricRegistry = new MetricRegistry
>   // Metric names should not have . in them, so that all the metrics of a 
> query are identified
>   // together in Ganglia as a single metric group
>   registerGauge("inputRate-total", () => 
> stream.lastProgress.inputRowsPerSecond)
>   registerGauge("processingRate-total", () => 
> stream.lastProgress.inputRowsPerSecond)
>   registerGauge("latency", () => 
> stream.lastProgress.durationMs.get("triggerExecution").longValue())
>   private def registerGauge[T](name: String, f: () => T)(implicit num: 
> Numeric[T]): Unit = {
>     synchronized {
>       metricRegistry.register(name, new Gauge[T] {
>         override def getValue: T = f()
>       })
>     }
>   }
> }
> {code}
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala]
> After adjusting the line and rebuilding from source I tested the change by 
> checking the csv files produced via the metrics properties file. Previously 
> inputRate-total and processingRate-total were identical due to the same 
> metric being used. After the change the processingRate-total file held the 
> right value. 
> Please check the attached file "Processed Rows Per Second".
> After altering the code the correct values are displayed in column B. 
> They match the data from the INFO StreamExecution displayed during streaming



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to