This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e892a01  [SPARK-31683][CORE] Make Prometheus output consistent with 
DropWizard 4.1 result
e892a01 is described below

commit e892a016699d996b959b4db01242cff934d62f76
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Tue May 12 19:57:48 2020 +0000

    [SPARK-31683][CORE] Make Prometheus output consistent with DropWizard 4.1 
result
    
    ### What changes were proposed in this pull request?
    
    This PR aims to update Prometheus-related output format to be consistent 
with DropWizard 4.1 result.
    - Add `Number` metrics for gauges metrics.
    - Add `type` labels.
    
    ### Why are the changes needed?
    
    SPARK-29032 added Prometheus support. After that, SPARK-29674 upgraded 
DropWizard for JDK9+ support and this caused difference in output labels and 
number of keys for Guage metrics. The current status is different from Apache 
Spark 2.4.5. Since we cannot change DropWizard, this PR aims to be consistent 
in Apache Spark 3.0.0 only.
    
    **DropWizard 3.x**
    ```
    metrics_master_aliveWorkers_Value 1.0
    ```
    
    **DropWizard 4.1**
    ```
    metrics_master_aliveWorkers_Value{type="gauges",} 1.0
    metrics_master_aliveWorkers_Number{type="gauges",} 1.0
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but this is a new feature in 3.0.0.
    
    ### How was this patch tested?
    
    Manually check the output like the following.
    
    **JMXExporter Result**
    ```
    $ curl -s http://localhost:8088/ | grep "^metrics_master" | sort
    metrics_master_aliveWorkers_Number{type="gauges",} 1.0
    metrics_master_aliveWorkers_Value{type="gauges",} 1.0
    metrics_master_apps_Number{type="gauges",} 0.0
    metrics_master_apps_Value{type="gauges",} 0.0
    metrics_master_waitingApps_Number{type="gauges",} 0.0
    metrics_master_waitingApps_Value{type="gauges",} 0.0
    metrics_master_workers_Number{type="gauges",} 1.0
    metrics_master_workers_Value{type="gauges",} 1.0
    ```
    
    **This PR**
    ```
    $ curl -s http://localhost:8080/metrics/master/prometheus/ | grep master
    metrics_master_aliveWorkers_Number{type="gauges"} 1
    metrics_master_aliveWorkers_Value{type="gauges"} 1
    metrics_master_apps_Number{type="gauges"} 0
    metrics_master_apps_Value{type="gauges"} 0
    metrics_master_waitingApps_Number{type="gauges"} 0
    metrics_master_waitingApps_Value{type="gauges"} 0
    metrics_master_workers_Number{type="gauges"} 1
    metrics_master_workers_Value{type="gauges"} 1
    ```
    
    Closes #28510 from dongjoon-hyun/SPARK-31683.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: DB Tsai <d_t...@apple.com>
    (cherry picked from commit 07209f3e2deab824f04484fa6b8bab0ec0a635d6)
    Signed-off-by: DB Tsai <d_t...@apple.com>
---
 .../spark/metrics/sink/PrometheusServlet.scala     | 73 ++++++++++++----------
 .../spark/status/api/v1/PrometheusResource.scala   | 52 +++++++--------
 2 files changed, 67 insertions(+), 58 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala
index 011c7bc..59b863b 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala
@@ -56,58 +56,65 @@ private[spark] class PrometheusServlet(
   def getMetricsSnapshot(request: HttpServletRequest): String = {
     import scala.collection.JavaConverters._
 
+    val guagesLabel = """{type="gauges"}"""
+    val countersLabel = """{type="counters"}"""
+    val metersLabel = countersLabel
+    val histogramslabels = """{type="histograms"}"""
+    val timersLabels = """{type="timers"}"""
+
     val sb = new StringBuilder()
     registry.getGauges.asScala.foreach { case (k, v) =>
       if (!v.getValue.isInstanceOf[String]) {
-        sb.append(s"${normalizeKey(k)}Value ${v.getValue}\n")
+        sb.append(s"${normalizeKey(k)}Number$guagesLabel ${v.getValue}\n")
+        sb.append(s"${normalizeKey(k)}Value$guagesLabel ${v.getValue}\n")
       }
     }
     registry.getCounters.asScala.foreach { case (k, v) =>
-      sb.append(s"${normalizeKey(k)}Count ${v.getCount}\n")
+      sb.append(s"${normalizeKey(k)}Count$countersLabel ${v.getCount}\n")
     }
     registry.getHistograms.asScala.foreach { case (k, h) =>
       val snapshot = h.getSnapshot
       val prefix = normalizeKey(k)
-      sb.append(s"${prefix}Count ${h.getCount}\n")
-      sb.append(s"${prefix}Max ${snapshot.getMax}\n")
-      sb.append(s"${prefix}Mean ${snapshot.getMean}\n")
-      sb.append(s"${prefix}Min ${snapshot.getMin}\n")
-      sb.append(s"${prefix}50thPercentile ${snapshot.getMedian}\n")
-      sb.append(s"${prefix}75thPercentile ${snapshot.get75thPercentile}\n")
-      sb.append(s"${prefix}95thPercentile ${snapshot.get95thPercentile}\n")
-      sb.append(s"${prefix}98thPercentile ${snapshot.get98thPercentile}\n")
-      sb.append(s"${prefix}99thPercentile ${snapshot.get99thPercentile}\n")
-      sb.append(s"${prefix}999thPercentile ${snapshot.get999thPercentile}\n")
-      sb.append(s"${prefix}StdDev ${snapshot.getStdDev}\n")
+      sb.append(s"${prefix}Count$histogramslabels ${h.getCount}\n")
+      sb.append(s"${prefix}Max$histogramslabels ${snapshot.getMax}\n")
+      sb.append(s"${prefix}Mean$histogramslabels ${snapshot.getMean}\n")
+      sb.append(s"${prefix}Min$histogramslabels ${snapshot.getMin}\n")
+      sb.append(s"${prefix}50thPercentile$histogramslabels 
${snapshot.getMedian}\n")
+      sb.append(s"${prefix}75thPercentile$histogramslabels 
${snapshot.get75thPercentile}\n")
+      sb.append(s"${prefix}95thPercentile$histogramslabels 
${snapshot.get95thPercentile}\n")
+      sb.append(s"${prefix}98thPercentile$histogramslabels 
${snapshot.get98thPercentile}\n")
+      sb.append(s"${prefix}99thPercentile$histogramslabels 
${snapshot.get99thPercentile}\n")
+      sb.append(s"${prefix}999thPercentile$histogramslabels 
${snapshot.get999thPercentile}\n")
+      sb.append(s"${prefix}StdDev$histogramslabels ${snapshot.getStdDev}\n")
     }
     registry.getMeters.entrySet.iterator.asScala.foreach { kv =>
       val prefix = normalizeKey(kv.getKey)
       val meter = kv.getValue
-      sb.append(s"${prefix}Count ${meter.getCount}\n")
-      sb.append(s"${prefix}MeanRate ${meter.getMeanRate}\n")
-      sb.append(s"${prefix}OneMinuteRate ${meter.getOneMinuteRate}\n")
-      sb.append(s"${prefix}FiveMinuteRate ${meter.getFiveMinuteRate}\n")
-      sb.append(s"${prefix}FifteenMinuteRate ${meter.getFifteenMinuteRate}\n")
+      sb.append(s"${prefix}Count$metersLabel ${meter.getCount}\n")
+      sb.append(s"${prefix}MeanRate$metersLabel ${meter.getMeanRate}\n")
+      sb.append(s"${prefix}OneMinuteRate$metersLabel 
${meter.getOneMinuteRate}\n")
+      sb.append(s"${prefix}FiveMinuteRate$metersLabel 
${meter.getFiveMinuteRate}\n")
+      sb.append(s"${prefix}FifteenMinuteRate$metersLabel 
${meter.getFifteenMinuteRate}\n")
     }
     registry.getTimers.entrySet.iterator.asScala.foreach { kv =>
       val prefix = normalizeKey(kv.getKey)
       val timer = kv.getValue
       val snapshot = timer.getSnapshot
-      sb.append(s"${prefix}Count ${timer.getCount}\n")
-      sb.append(s"${prefix}Max ${snapshot.getMax}\n")
-      sb.append(s"${prefix}Mean ${snapshot.getMax}\n")
-      sb.append(s"${prefix}Min ${snapshot.getMin}\n")
-      sb.append(s"${prefix}50thPercentile ${snapshot.getMedian}\n")
-      sb.append(s"${prefix}75thPercentile ${snapshot.get75thPercentile}\n")
-      sb.append(s"${prefix}95thPercentile ${snapshot.get95thPercentile}\n")
-      sb.append(s"${prefix}98thPercentile ${snapshot.get98thPercentile}\n")
-      sb.append(s"${prefix}99thPercentile ${snapshot.get99thPercentile}\n")
-      sb.append(s"${prefix}999thPercentile ${snapshot.get999thPercentile}\n")
-      sb.append(s"${prefix}StdDev ${snapshot.getStdDev}\n")
-      sb.append(s"${prefix}FifteenMinuteRate ${timer.getFifteenMinuteRate}\n")
-      sb.append(s"${prefix}FiveMinuteRate ${timer.getFiveMinuteRate}\n")
-      sb.append(s"${prefix}OneMinuteRate ${timer.getOneMinuteRate}\n")
-      sb.append(s"${prefix}MeanRate ${timer.getMeanRate}\n")
+      sb.append(s"${prefix}Count$timersLabels ${timer.getCount}\n")
+      sb.append(s"${prefix}Max$timersLabels ${snapshot.getMax}\n")
+      sb.append(s"${prefix}Mean$timersLabels ${snapshot.getMax}\n")
+      sb.append(s"${prefix}Min$timersLabels ${snapshot.getMin}\n")
+      sb.append(s"${prefix}50thPercentile$timersLabels 
${snapshot.getMedian}\n")
+      sb.append(s"${prefix}75thPercentile$timersLabels 
${snapshot.get75thPercentile}\n")
+      sb.append(s"${prefix}95thPercentile$timersLabels 
${snapshot.get95thPercentile}\n")
+      sb.append(s"${prefix}98thPercentile$timersLabels 
${snapshot.get98thPercentile}\n")
+      sb.append(s"${prefix}99thPercentile$timersLabels 
${snapshot.get99thPercentile}\n")
+      sb.append(s"${prefix}999thPercentile$timersLabels 
${snapshot.get999thPercentile}\n")
+      sb.append(s"${prefix}StdDev$timersLabels ${snapshot.getStdDev}\n")
+      sb.append(s"${prefix}FifteenMinuteRate$timersLabels 
${timer.getFifteenMinuteRate}\n")
+      sb.append(s"${prefix}FiveMinuteRate$timersLabels 
${timer.getFiveMinuteRate}\n")
+      sb.append(s"${prefix}OneMinuteRate$timersLabels 
${timer.getOneMinuteRate}\n")
+      sb.append(s"${prefix}MeanRate$timersLabels ${timer.getMeanRate}\n")
     }
     sb.toString()
   }
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala
index 2a5f151..4ed3d45 100644
--- 
a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala
@@ -50,27 +50,27 @@ private[v1] class PrometheusResource extends 
ApiRequestContext {
         "application_name" -> store.applicationInfo.name,
         "executor_id" -> executor.id
       ).map { case (k, v) => s"""$k="$v"""" }.mkString("{", ", ", "}")
-      sb.append(s"${prefix}rddBlocks_Count$labels ${executor.rddBlocks}\n")
-      sb.append(s"${prefix}memoryUsed_Count$labels ${executor.memoryUsed}\n")
-      sb.append(s"${prefix}diskUsed_Count$labels ${executor.diskUsed}\n")
-      sb.append(s"${prefix}totalCores_Count$labels ${executor.totalCores}\n")
-      sb.append(s"${prefix}maxTasks_Count$labels ${executor.maxTasks}\n")
-      sb.append(s"${prefix}activeTasks_Count$labels ${executor.activeTasks}\n")
-      sb.append(s"${prefix}failedTasks_Count$labels ${executor.failedTasks}\n")
-      sb.append(s"${prefix}completedTasks_Count$labels 
${executor.completedTasks}\n")
-      sb.append(s"${prefix}totalTasks_Count$labels ${executor.totalTasks}\n")
-      sb.append(s"${prefix}totalDuration_Value$labels 
${executor.totalDuration}\n")
-      sb.append(s"${prefix}totalGCTime_Value$labels ${executor.totalGCTime}\n")
-      sb.append(s"${prefix}totalInputBytes_Count$labels 
${executor.totalInputBytes}\n")
-      sb.append(s"${prefix}totalShuffleRead_Count$labels 
${executor.totalShuffleRead}\n")
-      sb.append(s"${prefix}totalShuffleWrite_Count$labels 
${executor.totalShuffleWrite}\n")
-      sb.append(s"${prefix}maxMemory_Count$labels ${executor.maxMemory}\n")
+      sb.append(s"${prefix}rddBlocks$labels ${executor.rddBlocks}\n")
+      sb.append(s"${prefix}memoryUsed_bytes$labels ${executor.memoryUsed}\n")
+      sb.append(s"${prefix}diskUsed_bytes$labels ${executor.diskUsed}\n")
+      sb.append(s"${prefix}totalCores$labels ${executor.totalCores}\n")
+      sb.append(s"${prefix}maxTasks$labels ${executor.maxTasks}\n")
+      sb.append(s"${prefix}activeTasks$labels ${executor.activeTasks}\n")
+      sb.append(s"${prefix}failedTasks_total$labels ${executor.failedTasks}\n")
+      sb.append(s"${prefix}completedTasks_total$labels 
${executor.completedTasks}\n")
+      sb.append(s"${prefix}totalTasks_total$labels ${executor.totalTasks}\n")
+      sb.append(s"${prefix}totalDuration_seconds_total$labels 
${executor.totalDuration * 0.001}\n")
+      sb.append(s"${prefix}totalGCTime_seconds_total$labels 
${executor.totalGCTime * 0.001}\n")
+      sb.append(s"${prefix}totalInputBytes_bytes_total$labels 
${executor.totalInputBytes}\n")
+      sb.append(s"${prefix}totalShuffleRead_bytes_total$labels 
${executor.totalShuffleRead}\n")
+      sb.append(s"${prefix}totalShuffleWrite_bytes_total$labels 
${executor.totalShuffleWrite}\n")
+      sb.append(s"${prefix}maxMemory_bytes$labels ${executor.maxMemory}\n")
       executor.executorLogs.foreach { case (k, v) => }
       executor.memoryMetrics.foreach { m =>
-        sb.append(s"${prefix}usedOnHeapStorageMemory_Count$labels 
${m.usedOnHeapStorageMemory}\n")
-        sb.append(s"${prefix}usedOffHeapStorageMemory_Count$labels 
${m.usedOffHeapStorageMemory}\n")
-        sb.append(s"${prefix}totalOnHeapStorageMemory_Count$labels 
${m.totalOnHeapStorageMemory}\n")
-        sb.append(s"${prefix}totalOffHeapStorageMemory_Count$labels " +
+        sb.append(s"${prefix}usedOnHeapStorageMemory_bytes$labels 
${m.usedOnHeapStorageMemory}\n")
+        sb.append(s"${prefix}usedOffHeapStorageMemory_bytes$labels 
${m.usedOffHeapStorageMemory}\n")
+        sb.append(s"${prefix}totalOnHeapStorageMemory_bytes$labels 
${m.totalOnHeapStorageMemory}\n")
+        sb.append(s"${prefix}totalOffHeapStorageMemory_bytes$labels " +
           s"${m.totalOffHeapStorageMemory}\n")
       }
       executor.peakMemoryMetrics.foreach { m =>
@@ -90,14 +90,16 @@ private[v1] class PrometheusResource extends 
ApiRequestContext {
           "ProcessTreePythonVMemory",
           "ProcessTreePythonRSSMemory",
           "ProcessTreeOtherVMemory",
-          "ProcessTreeOtherRSSMemory",
-          "MinorGCCount",
-          "MinorGCTime",
-          "MajorGCCount",
-          "MajorGCTime"
+          "ProcessTreeOtherRSSMemory"
         )
         names.foreach { name =>
-          sb.append(s"$prefix${name}_Count$labels ${m.getMetricValue(name)}\n")
+          sb.append(s"$prefix${name}_bytes$labels ${m.getMetricValue(name)}\n")
+        }
+        Seq("MinorGCCount", "MajorGCCount").foreach { name =>
+          sb.append(s"$prefix${name}_total$labels ${m.getMetricValue(name)}\n")
+        }
+        Seq("MinorGCTime", "MajorGCTime").foreach { name =>
+          sb.append(s"$prefix${name}_seconds_total$labels 
${m.getMetricValue(name) * 0.001}\n")
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to