Repository: spark Updated Branches: refs/heads/master 703e6da1e -> bd2c44713
[SPARK-25394][CORE] Add an application status metrics source - Exposes several metrics regarding application status as a source, useful to scrape them via jmx instead of mining the metrics rest api. Example use case: prometheus + jmx exporter. - Metrics are gathered when a job ends at the AppStatusListener side, could be more fine-grained but most metrics like tasks completed are also counted by executors. More metrics could be exposed in the future to avoid scraping executors in some scenarios. - a config option `spark.app.status.metrics.enabled` is added to disable/enable these metrics, by default they are disabled. This was manually tested with jmx source enabled and prometheus server on k8s:  In the next pic the job delay is shown for repeated pi calculation (Spark action).  Closes #22381 from skonto/add_app_status_metrics. Authored-by: Stavros Kontopoulos <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd2c4471 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd2c4471 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd2c4471 Branch: refs/heads/master Commit: bd2c4471311cd7e948c80b4927a903636ce0ce7e Parents: 703e6da Author: Stavros Kontopoulos <[email protected]> Authored: Tue Oct 16 14:57:32 2018 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Tue Oct 16 14:58:26 2018 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 7 +- .../apache/spark/status/AppStatusListener.scala | 33 +++++++- .../apache/spark/status/AppStatusSource.scala | 85 ++++++++++++++++++++ .../apache/spark/status/AppStatusStore.scala | 7 +- .../org/apache/spark/status/LiveEntity.scala | 2 +- 5 files changed, 125 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c4471/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0a66dae..10f3168 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -53,7 +53,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend -import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.{AppStatusSource, AppStatusStore} import org.apache.spark.status.api.v1.ThreadStackTrace import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump @@ -418,7 +418,8 @@ class SparkContext(config: SparkConf) extends Logging { // Initialize the app status store and listener before SparkEnv is created so that it gets // all events. - _statusStore = AppStatusStore.createLiveStore(conf) + val appStatusSource = AppStatusSource.createSource(conf) + _statusStore = AppStatusStore.createLiveStore(conf, appStatusSource) listenerBus.addToStatusQueue(_statusStore.listener.get) // Create the Spark execution environment (cache, map output tracker, etc) @@ -569,7 +570,7 @@ class SparkContext(config: SparkConf) extends Logging { _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } - + appStatusSource.foreach(_env.metricsSystem.registerSource(_)) // Make sure the context is stopped if the user forgets about it. This avoids leaving // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM // is killed, though. http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c4471/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 36aaf67..d52b7e8 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -44,6 +44,7 @@ private[spark] class AppStatusListener( kvstore: ElementTrackingStore, conf: SparkConf, live: Boolean, + appStatusSource: Option[AppStatusSource] = None, lastUpdateTime: Option[Long] = None) extends SparkListener with Logging { import config._ @@ -280,6 +281,11 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted + if (blacklisted) { + appStatusSource.foreach(_.BLACKLISTED_EXECUTORS.inc()) + } else { + appStatusSource.foreach(_.UNBLACKLISTED_EXECUTORS.inc()) + } liveUpdate(exec, System.nanoTime()) } } @@ -382,11 +388,34 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { - case JobSucceeded => JobExecutionStatus.SUCCEEDED - case JobFailed(_) => JobExecutionStatus.FAILED + case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc()} + JobExecutionStatus.SUCCEEDED + case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc()} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { + source <- appStatusSource + submissionTime <- job.submissionTime + completionTime <- job.completionTime + } { + source.JOB_DURATION.value.set(completionTime.getTime() - submissionTime.getTime()) + } + + // update global app status counters + appStatusSource.foreach { source => + source.COMPLETED_STAGES.inc(job.completedStages.size) + source.FAILED_STAGES.inc(job.failedStages) + source.COMPLETED_TASKS.inc(job.completedTasks) + source.FAILED_TASKS.inc(job.failedTasks) + source.KILLED_TASKS.inc(job.killedTasks) + source.SKIPPED_TASKS.inc(job.skippedTasks) + source.SKIPPED_STAGES.inc(job.skippedStages.size) + } update(job, now, last = true) if (job.status == JobExecutionStatus.SUCCEEDED) { appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages) http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c4471/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala new file mode 100644 index 0000000..3ab293d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.concurrent.atomic.AtomicLong + +import AppStatusSource.getCounter +import com.codahale.metrics.{Counter, Gauge, MetricRegistry} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.metrics.source.Source + +private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { + override def getValue: Long = value.get() +} + +private[spark] class AppStatusSource extends Source { + + override implicit val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val jobDuration = new JobDuration(new AtomicLong(0L)) + + // Duration of each job in milliseconds + val JOB_DURATION = metricRegistry + .register(MetricRegistry.name("jobDuration"), jobDuration) + + val FAILED_STAGES = getCounter("stages", "failedStages") + + val SKIPPED_STAGES = getCounter("stages", "skippedStages") + + val COMPLETED_STAGES = getCounter("stages", "completedStages") + + val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs") + + val FAILED_JOBS = getCounter("jobs", "failedJobs") + + val COMPLETED_TASKS = getCounter("tasks", "completedTasks") + + val FAILED_TASKS = getCounter("tasks", "failedTasks") + + val KILLED_TASKS = getCounter("tasks", "killedTasks") + + val SKIPPED_TASKS = getCounter("tasks", "skippedTasks") + + val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors") + + val UNBLACKLISTED_EXECUTORS = getCounter("tasks", "unblackListedExecutors") +} + +private[spark] object AppStatusSource { + + def getCounter(prefix: String, name: String)(implicit metricRegistry: MetricRegistry): Counter = { + metricRegistry.counter(MetricRegistry.name(prefix, name)) + } + + def createSource(conf: SparkConf): Option[AppStatusSource] = { + Option(conf.get(AppStatusSource.APP_STATUS_METRICS_ENABLED)) + .filter(identity) + .map { _ => new AppStatusSource() } + } + + val APP_STATUS_METRICS_ENABLED = + ConfigBuilder("spark.app.status.metrics.enabled") + .doc("Whether Dropwizard/Codahale metrics " + + "will be reported for the status of the running spark app.") + .booleanConf + .createWithDefault(false) +} http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c4471/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 9839cbb..63b9d89 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -505,10 +505,11 @@ private[spark] object AppStatusStore { /** * Create an in-memory store for a live application. */ - def createLiveStore(conf: SparkConf): AppStatusStore = { + def createLiveStore( + conf: SparkConf, + appStatusSource: Option[AppStatusSource] = None): AppStatusStore = { val store = new ElementTrackingStore(new InMemoryStore(), conf) - val listener = new AppStatusListener(store, conf, true) + val listener = new AppStatusListener(store, conf, true, appStatusSource) new AppStatusStore(store, listener = Some(listener)) } - } http://git-wip-us.apache.org/repos/asf/spark/blob/bd2c4471/core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 8708e64..8066331 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -61,7 +61,7 @@ private[spark] abstract class LiveEntity { private class LiveJob( val jobId: Int, name: String, - submissionTime: Option[Date], + val submissionTime: Option[Date], val stageIds: Seq[Int], jobGroup: Option[String], numTasks: Int) extends LiveEntity { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
