Repository: spark
Updated Branches:
  refs/heads/master 703e6da1e -> bd2c44713


[SPARK-25394][CORE] Add an application status metrics source

- Exposes several metrics regarding application status as a source, useful to 
scrape them via jmx instead of mining the metrics rest api.  Example use case: 
prometheus + jmx exporter.
- Metrics are gathered when a job ends at the AppStatusListener side, could be 
more fine-grained but most metrics like tasks completed are also counted by 
executors. More metrics could be exposed in the future to avoid scraping 
executors in some scenarios.
- a config option `spark.app.status.metrics.enabled` is added to disable/enable 
these metrics, by default they are disabled.

This was manually tested with jmx source enabled and prometheus server on k8s:
![metrics](https://user-images.githubusercontent.com/7945591/45300945-63064d00-b518-11e8-812a-d9b4155ba0c0.png)
In the next pic the job delay is shown for repeated pi calculation (Spark 
action).
![pi](https://user-images.githubusercontent.com/7945591/45329927-89a1a380-b56b-11e8-9cc1-5e76cb83969f.png)

Closes #22381 from skonto/add_app_status_metrics.

Authored-by: Stavros Kontopoulos <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd2c4471
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd2c4471
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd2c4471

Branch: refs/heads/master
Commit: bd2c4471311cd7e948c80b4927a903636ce0ce7e
Parents: 703e6da
Author: Stavros Kontopoulos <[email protected]>
Authored: Tue Oct 16 14:57:32 2018 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Tue Oct 16 14:58:26 2018 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  7 +-
 .../apache/spark/status/AppStatusListener.scala | 33 +++++++-
 .../apache/spark/status/AppStatusSource.scala   | 85 ++++++++++++++++++++
 .../apache/spark/status/AppStatusStore.scala    |  7 +-
 .../org/apache/spark/status/LiveEntity.scala    |  2 +-
 5 files changed, 125 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c4471/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0a66dae..10f3168 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -53,7 +53,7 @@ import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
StandaloneSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
-import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.{AppStatusSource, AppStatusStore}
 import org.apache.spark.status.api.v1.ThreadStackTrace
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
@@ -418,7 +418,8 @@ class SparkContext(config: SparkConf) extends Logging {
 
     // Initialize the app status store and listener before SparkEnv is created 
so that it gets
     // all events.
-    _statusStore = AppStatusStore.createLiveStore(conf)
+    val appStatusSource = AppStatusSource.createSource(conf)
+    _statusStore = AppStatusStore.createLiveStore(conf, appStatusSource)
     listenerBus.addToStatusQueue(_statusStore.listener.get)
 
     // Create the Spark execution environment (cache, map output tracker, etc)
@@ -569,7 +570,7 @@ class SparkContext(config: SparkConf) extends Logging {
     _executorAllocationManager.foreach { e =>
       _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
     }
-
+    appStatusSource.foreach(_env.metricsSystem.registerSource(_))
     // Make sure the context is stopped if the user forgets about it. This 
avoids leaving
     // unfinished event logs around after the JVM exits cleanly. It doesn't 
help if the JVM
     // is killed, though.

http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c4471/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 36aaf67..d52b7e8 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -44,6 +44,7 @@ private[spark] class AppStatusListener(
     kvstore: ElementTrackingStore,
     conf: SparkConf,
     live: Boolean,
+    appStatusSource: Option[AppStatusSource] = None,
     lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
 
   import config._
@@ -280,6 +281,11 @@ private[spark] class AppStatusListener(
   private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
     liveExecutors.get(execId).foreach { exec =>
       exec.isBlacklisted = blacklisted
+      if (blacklisted) {
+        appStatusSource.foreach(_.BLACKLISTED_EXECUTORS.inc())
+      } else {
+        appStatusSource.foreach(_.UNBLACKLISTED_EXECUTORS.inc())
+      }
       liveUpdate(exec, System.nanoTime())
     }
   }
@@ -382,11 +388,34 @@ private[spark] class AppStatusListener(
       }
 
       job.status = event.jobResult match {
-        case JobSucceeded => JobExecutionStatus.SUCCEEDED
-        case JobFailed(_) => JobExecutionStatus.FAILED
+        case JobSucceeded =>
+          appStatusSource.foreach{_.SUCCEEDED_JOBS.inc()}
+          JobExecutionStatus.SUCCEEDED
+        case JobFailed(_) =>
+          appStatusSource.foreach{_.FAILED_JOBS.inc()}
+          JobExecutionStatus.FAILED
       }
 
       job.completionTime = if (event.time > 0) Some(new Date(event.time)) else 
None
+
+      for {
+        source <- appStatusSource
+        submissionTime <- job.submissionTime
+        completionTime <- job.completionTime
+      } {
+        source.JOB_DURATION.value.set(completionTime.getTime() - 
submissionTime.getTime())
+      }
+
+      // update global app status counters
+      appStatusSource.foreach { source =>
+        source.COMPLETED_STAGES.inc(job.completedStages.size)
+        source.FAILED_STAGES.inc(job.failedStages)
+        source.COMPLETED_TASKS.inc(job.completedTasks)
+        source.FAILED_TASKS.inc(job.failedTasks)
+        source.KILLED_TASKS.inc(job.killedTasks)
+        source.SKIPPED_TASKS.inc(job.skippedTasks)
+        source.SKIPPED_STAGES.inc(job.skippedStages.size)
+      }
       update(job, now, last = true)
       if (job.status == JobExecutionStatus.SUCCEEDED) {
         appSummary = new AppSummary(appSummary.numCompletedJobs + 1, 
appSummary.numCompletedStages)

http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c4471/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala
new file mode 100644
index 0000000..3ab293d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import java.util.concurrent.atomic.AtomicLong
+
+import AppStatusSource.getCounter
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.metrics.source.Source
+
+private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] {
+  override def getValue: Long = value.get()
+}
+
+private[spark] class AppStatusSource extends Source {
+
+  override implicit val metricRegistry = new MetricRegistry()
+
+  override val sourceName = "appStatus"
+
+  val jobDuration = new JobDuration(new AtomicLong(0L))
+
+  // Duration of each job in milliseconds
+  val JOB_DURATION = metricRegistry
+    .register(MetricRegistry.name("jobDuration"), jobDuration)
+
+  val FAILED_STAGES = getCounter("stages", "failedStages")
+
+  val SKIPPED_STAGES = getCounter("stages", "skippedStages")
+
+  val COMPLETED_STAGES = getCounter("stages", "completedStages")
+
+  val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs")
+
+  val FAILED_JOBS = getCounter("jobs", "failedJobs")
+
+  val COMPLETED_TASKS = getCounter("tasks", "completedTasks")
+
+  val FAILED_TASKS = getCounter("tasks", "failedTasks")
+
+  val KILLED_TASKS = getCounter("tasks", "killedTasks")
+
+  val SKIPPED_TASKS = getCounter("tasks", "skippedTasks")
+
+  val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors")
+
+  val UNBLACKLISTED_EXECUTORS = getCounter("tasks", "unblackListedExecutors")
+}
+
+private[spark] object AppStatusSource {
+
+  def getCounter(prefix: String, name: String)(implicit metricRegistry: 
MetricRegistry): Counter = {
+    metricRegistry.counter(MetricRegistry.name(prefix, name))
+  }
+
+  def createSource(conf: SparkConf): Option[AppStatusSource] = {
+    Option(conf.get(AppStatusSource.APP_STATUS_METRICS_ENABLED))
+      .filter(identity)
+      .map { _ => new AppStatusSource() }
+  }
+
+  val APP_STATUS_METRICS_ENABLED =
+    ConfigBuilder("spark.app.status.metrics.enabled")
+      .doc("Whether Dropwizard/Codahale metrics " +
+        "will be reported for the status of the running spark app.")
+      .booleanConf
+      .createWithDefault(false)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c4471/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 9839cbb..63b9d89 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -505,10 +505,11 @@ private[spark] object AppStatusStore {
   /**
    * Create an in-memory store for a live application.
    */
-  def createLiveStore(conf: SparkConf): AppStatusStore = {
+  def createLiveStore(
+      conf: SparkConf,
+      appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
     val store = new ElementTrackingStore(new InMemoryStore(), conf)
-    val listener = new AppStatusListener(store, conf, true)
+    val listener = new AppStatusListener(store, conf, true, appStatusSource)
     new AppStatusStore(store, listener = Some(listener))
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c4471/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala 
b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 8708e64..8066331 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -61,7 +61,7 @@ private[spark] abstract class LiveEntity {
 private class LiveJob(
     val jobId: Int,
     name: String,
-    submissionTime: Option[Date],
+    val submissionTime: Option[Date],
     val stageIds: Seq[Int],
     jobGroup: Option[String],
     numTasks: Int) extends LiveEntity {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to