Revert "remedy the line-wrap while exceeding 100 chars" This reverts commit 892fb8ffa85016a63d7d00dd6f1abc58ccf034a2.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a2af6b54 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a2af6b54 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a2af6b54 Branch: refs/heads/master Commit: a2af6b543a0a70d94a451c9022deea181d04f8e8 Parents: 892fb8f Author: Grace Huang <jie.hu...@intel.com> Authored: Tue Oct 8 17:44:56 2013 +0800 Committer: Grace Huang <jie.hu...@intel.com> Committed: Tue Oct 8 17:44:56 2013 +0800 ---------------------------------------------------------------------- .../spark/deploy/master/ApplicationSource.scala | 13 ++-- .../spark/deploy/master/MasterSource.scala | 19 +++--- .../spark/deploy/worker/WorkerSource.scala | 31 +++++---- .../apache/spark/executor/ExecutorSource.scala | 45 +++++-------- .../spark/scheduler/DAGSchedulerSource.scala | 31 +++++---- .../spark/storage/BlockManagerSource.scala | 66 +++++++++----------- 6 files changed, 91 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a2af6b54/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala index f0b1f77..c72322e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala @@ -31,13 +31,12 @@ class ApplicationSource(val application: ApplicationInfo) extends Source { override def getValue: String = application.state.toString }) - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), - new Gauge[Long] { override def getValue: Long = application.duration }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), new Gauge[Long] { + override def getValue: Long = application.duration + }) - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), - new Gauge[Int] { override def getValue: Int = application.coresGranted }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), new Gauge[Int] { + override def getValue: Int = application.coresGranted + }) } - http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a2af6b54/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala index 8a88fef..de39398 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala @@ -27,18 +27,17 @@ private[spark] class MasterSource(val master: Master) extends Source { val sourceName = "master" // Gauge for worker numbers in cluster - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), - new Gauge[Int] { override def getValue: Int = master.workers.size }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), new Gauge[Int] { + override def getValue: Int = master.workers.size + }) // Gauge for application numbers in cluster - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), - new Gauge[Int] { override def getValue: Int = master.apps.size }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), new Gauge[Int] { + override def getValue: Int = master.apps.size + }) // Gauge for waiting application numbers in cluster - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), - new Gauge[Int] { override def getValue: Int = master.waitingApps.size }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), new Gauge[Int] { + override def getValue: Int = master.waitingApps.size + }) } - http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a2af6b54/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala index 0596f14..fc4f4ae 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala @@ -26,28 +26,27 @@ private[spark] class WorkerSource(val worker: Worker) extends Source { val sourceName = "worker" val metricRegistry = new MetricRegistry() - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), - new Gauge[Int] { override def getValue: Int = worker.executors.size }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), new Gauge[Int] { + override def getValue: Int = worker.executors.size + }) // Gauge for cores used of this worker - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), - new Gauge[Int] { override def getValue: Int = worker.coresUsed }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), new Gauge[Int] { + override def getValue: Int = worker.coresUsed + }) // Gauge for memory used of this worker - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), - new Gauge[Int] { override def getValue: Int = worker.memoryUsed }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Int] { + override def getValue: Int = worker.memoryUsed + }) // Gauge for cores free of this worker - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), - new Gauge[Int] { override def getValue: Int = worker.coresFree }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), new Gauge[Int] { + override def getValue: Int = worker.coresFree + }) // Gauge for memory free of this worker - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), - new Gauge[Int] { override def getValue: Int = worker.memoryFree }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), new Gauge[Int] { + override def getValue: Int = worker.memoryFree + }) } - http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a2af6b54/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index d063e4a..6cbd154 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -44,42 +44,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts - metricRegistry.register( - MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), - new Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() }) + metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), new Gauge[Int] { + override def getValue: Int = executor.threadPool.getActiveCount() + }) // Gauge for executor thread pool's approximate total number of tasks that have been completed - metricRegistry.register( - MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), - new Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() }) + metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), new Gauge[Long] { + override def getValue: Long = executor.threadPool.getCompletedTaskCount() + }) // Gauge for executor thread pool's current number of threads - metricRegistry.register( - MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), - new Gauge[Int] { override def getValue: Int = executor.threadPool.getPoolSize() }) + metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), new Gauge[Int] { + override def getValue: Int = executor.threadPool.getPoolSize() + }) // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool - metricRegistry.register( - MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), - new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() }) + metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), new Gauge[Int] { + override def getValue: Int = executor.threadPool.getMaximumPoolSize() + }) // Gauge for file system stats of this executor for (scheme <- Array("hdfs", "file")) { - registerFileSystemStat(scheme, - NamingConventions.makeMetricName("read", "bytes"), - _.getBytesRead(), 0L) - registerFileSystemStat(scheme, - NamingConventions.makeMetricName("write", "bytes"), - _.getBytesWritten(), 0L) - registerFileSystemStat(scheme, - NamingConventions.makeMetricName("read", "ops"), - _.getReadOps(), 0) - registerFileSystemStat(scheme, - NamingConventions.makeMetricName("largeRead", "ops"), - _.getLargeReadOps(), 0) - registerFileSystemStat(scheme, - NamingConventions.makeMetricName("write", "ops"), - _.getWriteOps(), 0) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "bytes"), _.getBytesRead(), 0L) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "bytes"), _.getBytesWritten(), 0L) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "ops"), _.getReadOps(), 0) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("largeRead", "ops"), _.getLargeReadOps(), 0) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "ops"), _.getWriteOps(), 0) } } - http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a2af6b54/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 02fb807..9e90a08 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -28,24 +28,23 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.DAGScheduler".format(sc.appName) - metricRegistry.register( - MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), - new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size }) + metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), new Gauge[Int] { + override def getValue: Int = dagScheduler.failed.size + }) - metricRegistry.register( - MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), - new Gauge[Int] { override def getValue: Int = dagScheduler.running.size }) + metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), new Gauge[Int] { + override def getValue: Int = dagScheduler.running.size + }) - metricRegistry.register( - MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), - new Gauge[Int] { override def getValue: Int = dagScheduler.waiting.size }) + metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), new Gauge[Int] { + override def getValue: Int = dagScheduler.waiting.size + }) - metricRegistry.register( - MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), - new Gauge[Int] { override def getValue: Int = dagScheduler.nextJobId.get() }) + metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), new Gauge[Int] { + override def getValue: Int = dagScheduler.nextJobId.get() + }) - metricRegistry.register( - MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), - new Gauge[Int] { override def getValue: Int = dagScheduler.activeJobs.size }) + metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), new Gauge[Int] { + override def getValue: Int = dagScheduler.activeJobs.size + }) } - http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a2af6b54/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index fcf9da4..4312250 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -29,48 +29,40 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.BlockManager".format(sc.appName) - metricRegistry.register( - MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), - new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) - maxMem / 1024 / 1024 - } + metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), new Gauge[Long] { + override def getValue: Long = { + val storageStatusList = blockManager.master.getStorageStatus + val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) + maxMem / 1024 / 1024 + } }) - metricRegistry.register( - MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), - new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) - remainingMem / 1024 / 1024 - } + metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), new Gauge[Long] { + override def getValue: Long = { + val storageStatusList = blockManager.master.getStorageStatus + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) + remainingMem / 1024 / 1024 + } }) - metricRegistry.register( - MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), - new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) - (maxMem - remainingMem) / 1024 / 1024 - } + metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Long] { + override def getValue: Long = { + val storageStatusList = blockManager.master.getStorageStatus + val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) + (maxMem - remainingMem) / 1024 / 1024 + } }) - metricRegistry.register( - MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val diskSpaceUsed = storageStatusList - .flatMap(_.blocks.values.map(_.diskSize)) - .reduceOption(_ + _) - .getOrElse(0L) - - diskSpaceUsed / 1024 / 1024 - } + metricRegistry.register(MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] { + override def getValue: Long = { + val storageStatusList = blockManager.master.getStorageStatus + val diskSpaceUsed = storageStatusList + .flatMap(_.blocks.values.map(_.diskSize)) + .reduceOption(_ + _) + .getOrElse(0L) + + diskSpaceUsed / 1024 / 1024 + } }) } -