http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala new file mode 100644 index 0000000..07b224f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import javax.ws.rs.{PathParam, GET, Produces} +import javax.ws.rs.core.MediaType + +import org.apache.spark.ui.SparkUI + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class OneRDDResource(ui: SparkUI) { + + @GET + def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = { + AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse( + throw new NotFoundException(s"no rdd found w/ id $rddId") + ) + } + +}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala new file mode 100644 index 0000000..fd24aea --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import javax.ws.rs._ +import javax.ws.rs.core.MediaType + +import org.apache.spark.SparkException +import org.apache.spark.scheduler.StageInfo +import org.apache.spark.status.api.v1.StageStatus._ +import org.apache.spark.status.api.v1.TaskSorting._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.jobs.JobProgressListener +import org.apache.spark.ui.jobs.UIData.StageUIData + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class OneStageResource(ui: SparkUI) { + + @GET + @Path("") + def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = { + withStage(stageId){ stageAttempts => + stageAttempts.map { stage => + AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, + includeDetails = true) + } + } + } + + @GET + @Path("/{stageAttemptId: \\d+}") + def oneAttemptData( + @PathParam("stageId") stageId: Int, + @PathParam("stageAttemptId") stageAttemptId: Int): StageData = { + withStageAttempt(stageId, stageAttemptId) { stage => + AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, + includeDetails = true) + } + } + + @GET + @Path("/{stageAttemptId: \\d+}/taskSummary") + def taskSummary( + @PathParam("stageId") stageId: Int, + @PathParam("stageAttemptId") stageAttemptId: Int, + @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String) + : TaskMetricDistributions = { + withStageAttempt(stageId, stageAttemptId) { stage => + val quantiles = quantileString.split(",").map { s => + try { + s.toDouble + } catch { + case nfe: NumberFormatException => + throw new BadParameterException("quantiles", "double", s) + } + } + AllStagesResource.taskMetricDistributions(stage.ui.taskData.values, quantiles) + } + } + + @GET + @Path("/{stageAttemptId: \\d+}/taskList") + def taskList( + @PathParam("stageId") stageId: Int, + @PathParam("stageAttemptId") stageAttemptId: Int, + @DefaultValue("0") @QueryParam("offset") offset: Int, + @DefaultValue("20") @QueryParam("length") length: Int, + @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { + withStageAttempt(stageId, stageAttemptId) { stage => + val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq + .sorted(OneStageResource.ordering(sortBy)) + tasks.slice(offset, offset + length) + } + } + + private case class StageStatusInfoUi(status: StageStatus, info: StageInfo, ui: StageUIData) + + private def withStage[T](stageId: Int)(f: Seq[StageStatusInfoUi] => T): T = { + val stageAttempts = findStageStatusUIData(ui.jobProgressListener, stageId) + if (stageAttempts.isEmpty) { + throw new NotFoundException("unknown stage: " + stageId) + } else { + f(stageAttempts) + } + } + + private def findStageStatusUIData( + listener: JobProgressListener, + stageId: Int): Seq[StageStatusInfoUi] = { + listener.synchronized { + def getStatusInfoUi(status: StageStatus, infos: Seq[StageInfo]): Seq[StageStatusInfoUi] = { + infos.filter { _.stageId == stageId }.map { info => + val ui = listener.stageIdToData.getOrElse((info.stageId, info.attemptId), + // this is an internal error -- we should always have uiData + throw new SparkException( + s"no stage ui data found for stage: ${info.stageId}:${info.attemptId}") + ) + StageStatusInfoUi(status, info, ui) + } + } + getStatusInfoUi(ACTIVE, listener.activeStages.values.toSeq) ++ + getStatusInfoUi(COMPLETE, listener.completedStages) ++ + getStatusInfoUi(FAILED, listener.failedStages) ++ + getStatusInfoUi(PENDING, listener.pendingStages.values.toSeq) + } + } + + private def withStageAttempt[T]( + stageId: Int, + stageAttemptId: Int) + (f: StageStatusInfoUi => T): T = { + withStage(stageId) { attempts => + val oneAttempt = attempts.find { stage => stage.info.attemptId == stageAttemptId } + oneAttempt match { + case Some(stage) => + f(stage) + case None => + val stageAttempts = attempts.map { _.info.attemptId } + throw new NotFoundException(s"unknown attempt for stage $stageId. " + + s"Found attempts: ${stageAttempts.mkString("[", ",", "]")}") + } + } + } +} + +object OneStageResource { + def ordering(taskSorting: TaskSorting): Ordering[TaskData] = { + val extractor: (TaskData => Long) = td => + taskSorting match { + case ID => td.taskId + case INCREASING_RUNTIME => td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L) + case DECREASING_RUNTIME => -td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L) + } + Ordering.by(extractor) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/SecurityFilter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/SecurityFilter.scala b/core/src/main/scala/org/apache/spark/status/api/v1/SecurityFilter.scala new file mode 100644 index 0000000..95fbd96 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/SecurityFilter.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import javax.ws.rs.WebApplicationException +import javax.ws.rs.core.Response + +import com.sun.jersey.spi.container.{ContainerRequest, ContainerRequestFilter} + +private[v1] class SecurityFilter extends ContainerRequestFilter with UIRootFromServletContext { + def filter(req: ContainerRequest): ContainerRequest = { + val user = Option(req.getUserPrincipal).map { _.getName }.orNull + if (uiRoot.securityManager.checkUIViewPermissions(user)) { + req + } else { + throw new WebApplicationException( + Response + .status(Response.Status.FORBIDDEN) + .entity(raw"""user "$user"is not authorized""") + .build() + ) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala b/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala new file mode 100644 index 0000000..cee2978 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import java.text.SimpleDateFormat +import java.util.TimeZone +import javax.ws.rs.WebApplicationException +import javax.ws.rs.core.Response +import javax.ws.rs.core.Response.Status + +import scala.util.Try + +private[v1] class SimpleDateParam(val originalValue: String) { + val timestamp: Long = { + SimpleDateParam.formats.collectFirst { + case fmt if Try(fmt.parse(originalValue)).isSuccess => + fmt.parse(originalValue).getTime() + }.getOrElse( + throw new WebApplicationException( + Response + .status(Status.BAD_REQUEST) + .entity("Couldn't parse date: " + originalValue) + .build() + ) + ) + } +} + +private[v1] object SimpleDateParam { + + val formats: Seq[SimpleDateFormat] = { + + val gmtDay = new SimpleDateFormat("yyyy-MM-dd") + gmtDay.setTimeZone(TimeZone.getTimeZone("GMT")) + + Seq( + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSz"), + gmtDay + ) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala new file mode 100644 index 0000000..ef3c857 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import java.util.Date + +import scala.collection.Map + +import org.apache.spark.JobExecutionStatus + +class ApplicationInfo private[spark]( + val id: String, + val name: String, + val attempts: Seq[ApplicationAttemptInfo]) + +class ApplicationAttemptInfo private[spark]( + val attemptId: Option[String], + val startTime: Date, + val endTime: Date, + val sparkUser: String, + val completed: Boolean = false) + +class ExecutorStageSummary private[spark]( + val taskTime : Long, + val failedTasks : Int, + val succeededTasks : Int, + val inputBytes : Long, + val outputBytes : Long, + val shuffleRead : Long, + val shuffleWrite : Long, + val memoryBytesSpilled : Long, + val diskBytesSpilled : Long) + +class ExecutorSummary private[spark]( + val id: String, + val hostPort: String, + val rddBlocks: Int, + val memoryUsed: Long, + val diskUsed: Long, + val activeTasks: Int, + val failedTasks: Int, + val completedTasks: Int, + val totalTasks: Int, + val totalDuration: Long, + val totalInputBytes: Long, + val totalShuffleRead: Long, + val totalShuffleWrite: Long, + val maxMemory: Long, + val executorLogs: Map[String, String]) + +class JobData private[spark]( + val jobId: Int, + val name: String, + val description: Option[String], + val submissionTime: Option[Date], + val completionTime: Option[Date], + val stageIds: Seq[Int], + val jobGroup: Option[String], + val status: JobExecutionStatus, + val numTasks: Int, + val numActiveTasks: Int, + val numCompletedTasks: Int, + val numSkippedTasks: Int, + val numFailedTasks: Int, + val numActiveStages: Int, + val numCompletedStages: Int, + val numSkippedStages: Int, + val numFailedStages: Int) + +// Q: should Tachyon size go in here as well? currently the UI only shows it on the overall storage +// page ... does anybody pay attention to it? +class RDDStorageInfo private[spark]( + val id: Int, + val name: String, + val numPartitions: Int, + val numCachedPartitions: Int, + val storageLevel: String, + val memoryUsed: Long, + val diskUsed: Long, + val dataDistribution: Option[Seq[RDDDataDistribution]], + val partitions: Option[Seq[RDDPartitionInfo]]) + +class RDDDataDistribution private[spark]( + val address: String, + val memoryUsed: Long, + val memoryRemaining: Long, + val diskUsed: Long) + +class RDDPartitionInfo private[spark]( + val blockName: String, + val storageLevel: String, + val memoryUsed: Long, + val diskUsed: Long, + val executors: Seq[String]) + +class StageData private[spark]( + val status: StageStatus, + val stageId: Int, + val attemptId: Int, + val numActiveTasks: Int , + val numCompleteTasks: Int, + val numFailedTasks: Int, + + val executorRunTime: Long, + + val inputBytes: Long, + val inputRecords: Long, + val outputBytes: Long, + val outputRecords: Long, + val shuffleReadBytes: Long, + val shuffleReadRecords: Long, + val shuffleWriteBytes: Long, + val shuffleWriteRecords: Long, + val memoryBytesSpilled: Long, + val diskBytesSpilled: Long, + + val name: String, + val details: String, + val schedulingPool: String, + + val accumulatorUpdates: Seq[AccumulableInfo], + val tasks: Option[Map[Long, TaskData]], + val executorSummary:Option[Map[String,ExecutorStageSummary]]) + +class TaskData private[spark]( + val taskId: Long, + val index: Int, + val attempt: Int, + val launchTime: Date, + val executorId: String, + val host: String, + val taskLocality: String, + val speculative: Boolean, + val accumulatorUpdates: Seq[AccumulableInfo], + val errorMessage: Option[String] = None, + val taskMetrics: Option[TaskMetrics] = None) + +class TaskMetrics private[spark]( + val executorDeserializeTime: Long, + val executorRunTime: Long, + val resultSize: Long, + val jvmGcTime: Long, + val resultSerializationTime: Long, + val memoryBytesSpilled: Long, + val diskBytesSpilled: Long, + val inputMetrics: Option[InputMetrics], + val outputMetrics: Option[OutputMetrics], + val shuffleReadMetrics: Option[ShuffleReadMetrics], + val shuffleWriteMetrics: Option[ShuffleWriteMetrics]) + +class InputMetrics private[spark]( + val bytesRead: Long, + val recordsRead: Long) + +class OutputMetrics private[spark]( + val bytesWritten: Long, + val recordsWritten: Long) + +class ShuffleReadMetrics private[spark]( + val remoteBlocksFetched: Int, + val localBlocksFetched: Int, + val fetchWaitTime: Long, + val remoteBytesRead: Long, + val totalBlocksFetched: Int, + val recordsRead: Long) + +class ShuffleWriteMetrics private[spark]( + val bytesWritten: Long, + val writeTime: Long, + val recordsWritten: Long) + +class TaskMetricDistributions private[spark]( + val quantiles: IndexedSeq[Double], + + val executorDeserializeTime: IndexedSeq[Double], + val executorRunTime: IndexedSeq[Double], + val resultSize: IndexedSeq[Double], + val jvmGcTime: IndexedSeq[Double], + val resultSerializationTime: IndexedSeq[Double], + val memoryBytesSpilled: IndexedSeq[Double], + val diskBytesSpilled: IndexedSeq[Double], + + val inputMetrics: Option[InputMetricDistributions], + val outputMetrics: Option[OutputMetricDistributions], + val shuffleReadMetrics: Option[ShuffleReadMetricDistributions], + val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions]) + +class InputMetricDistributions private[spark]( + val bytesRead: IndexedSeq[Double], + val recordsRead: IndexedSeq[Double]) + +class OutputMetricDistributions private[spark]( + val bytesWritten: IndexedSeq[Double], + val recordsWritten: IndexedSeq[Double]) + +class ShuffleReadMetricDistributions private[spark]( + val readBytes: IndexedSeq[Double], + val readRecords: IndexedSeq[Double], + val remoteBlocksFetched: IndexedSeq[Double], + val localBlocksFetched: IndexedSeq[Double], + val fetchWaitTime: IndexedSeq[Double], + val remoteBytesRead: IndexedSeq[Double], + val totalBlocksFetched: IndexedSeq[Double]) + +class ShuffleWriteMetricDistributions private[spark]( + val writeBytes: IndexedSeq[Double], + val writeRecords: IndexedSeq[Double], + val writeTime: IndexedSeq[Double]) + +class AccumulableInfo private[spark]( + val id: Long, + val name: String, + val update: Option[String], + val value: String) http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 7d75929..ec71148 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -25,13 +25,17 @@ import org.apache.spark.scheduler._ /** * :: DeveloperApi :: * A SparkListener that maintains executor storage status. + * + * This class is thread-safe (unlike JobProgressListener) */ @DeveloperApi class StorageStatusListener extends SparkListener { // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE) private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() - def storageStatusList: Seq[StorageStatus] = executorIdToStorageStatus.values.toSeq + def storageStatusList: Seq[StorageStatus] = synchronized { + executorIdToStorageStatus.values.toSeq + } /** Update storage status list to reflect updated block statuses */ private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/SparkUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index a5271f0..bfe4a18 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -17,6 +17,9 @@ package org.apache.spark.ui +import java.util.Date + +import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo, JsonRootResource, UIRoot} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler._ import org.apache.spark.storage.StorageStatusListener @@ -33,7 +36,7 @@ import org.apache.spark.ui.scope.RDDOperationGraphListener private[spark] class SparkUI private ( val sc: Option[SparkContext], val conf: SparkConf, - val securityManager: SecurityManager, + securityManager: SecurityManager, val environmentListener: EnvironmentListener, val storageStatusListener: StorageStatusListener, val executorsListener: ExecutorsListener, @@ -41,22 +44,27 @@ private[spark] class SparkUI private ( val storageListener: StorageListener, val operationGraphListener: RDDOperationGraphListener, var appName: String, - val basePath: String) + val basePath: String, + val startTime: Long) extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") - with Logging { + with Logging + with UIRoot { val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) + + val stagesTab = new StagesTab(this) + /** Initialize all components of the server. */ def initialize() { attachTab(new JobsTab(this)) - val stagesTab = new StagesTab(this) attachTab(stagesTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath)) + attachHandler(JsonRootResource.getJsonServlet(this)) // This should be POST only, but, the YARN AM proxy won't proxy POSTs attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages", stagesTab.handleKillRequest, @@ -83,6 +91,24 @@ private[spark] class SparkUI private ( private[spark] def appUIHostPort = publicHostName + ":" + boundPort private[spark] def appUIAddress = s"http://$appUIHostPort" + + def getSparkUI(appId: String): Option[SparkUI] = { + if (appId == appName) Some(this) else None + } + + def getApplicationInfoList: Iterator[ApplicationInfo] = { + Iterator(new ApplicationInfo( + id = appName, + name = appName, + attempts = Seq(new ApplicationAttemptInfo( + attemptId = None, + startTime = new Date(startTime), + endTime = new Date(-1), + sparkUser = "", + completed = false + )) + )) + } } private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) @@ -109,9 +135,10 @@ private[spark] object SparkUI { listenerBus: SparkListenerBus, jobProgressListener: JobProgressListener, securityManager: SecurityManager, - appName: String): SparkUI = { + appName: String, + startTime: Long): SparkUI = { create(Some(sc), conf, listenerBus, securityManager, appName, - jobProgressListener = Some(jobProgressListener)) + jobProgressListener = Some(jobProgressListener), startTime = startTime) } def createHistoryUI( @@ -119,8 +146,9 @@ private[spark] object SparkUI { listenerBus: SparkListenerBus, securityManager: SecurityManager, appName: String, - basePath: String): SparkUI = { - create(None, conf, listenerBus, securityManager, appName, basePath) + basePath: String, + startTime: Long): SparkUI = { + create(None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime) } /** @@ -137,7 +165,8 @@ private[spark] object SparkUI { securityManager: SecurityManager, appName: String, basePath: String = "", - jobProgressListener: Option[JobProgressListener] = None): SparkUI = { + jobProgressListener: Option[JobProgressListener] = None, + startTime: Long): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { val listener = new JobProgressListener(conf) @@ -159,6 +188,6 @@ private[spark] object SparkUI { new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, executorsListener, _jobProgressListener, storageListener, operationGraphListener, - appName, basePath) + appName, basePath, startTime) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/WebUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index f9860d1..384f2ad 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -37,7 +37,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly. */ private[spark] abstract class WebUI( - securityManager: SecurityManager, + val securityManager: SecurityManager, port: Int, conf: SparkConf, basePath: String = "", @@ -77,15 +77,9 @@ private[spark] abstract class WebUI( val pagePath = "/" + page.prefix val renderHandler = createServletHandler(pagePath, (request: HttpServletRequest) => page.render(request), securityManager, basePath) - val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json", - (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath) attachHandler(renderHandler) - attachHandler(renderJsonHandler) pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) .append(renderHandler) - pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) - .append(renderJsonHandler) - } /** Attach a handler to this UI. */ http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 956608d..b247e4c 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -22,11 +22,11 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.apache.spark.status.api.v1.ExecutorSummary import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils -/** Summary information about an executor to display in the UI. */ -// Needs to be private[ui] because of a false positive MiMa failure. +// This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive private[ui] case class ExecutorSummaryInfo( id: String, hostPort: String, @@ -44,6 +44,7 @@ private[ui] case class ExecutorSummaryInfo( maxMemory: Long, executorLogs: Map[String, String]) + private[ui] class ExecutorsPage( parent: ExecutorsTab, threadDumpEnabled: Boolean) @@ -55,7 +56,8 @@ private[ui] class ExecutorsPage( val maxMem = storageStatusList.map(_.maxMem).sum val memUsed = storageStatusList.map(_.memUsed).sum val diskUsed = storageStatusList.map(_.diskUsed).sum - val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId) + val execInfo = for (statusId <- 0 until storageStatusList.size) yield + ExecutorsPage.getExecInfo(listener, statusId) val execInfoSorted = execInfo.sortBy(_.id) val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty @@ -111,7 +113,7 @@ private[ui] class ExecutorsPage( } /** Render an HTML row representing an executor */ - private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = { + private def execRow(info: ExecutorSummary, logsExist: Boolean): Seq[Node] = { val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed @@ -170,8 +172,11 @@ private[ui] class ExecutorsPage( </tr> } +} + +private[spark] object ExecutorsPage { /** Represent an executor's info as a map given a storage status index */ - private def getExecInfo(statusId: Int): ExecutorSummaryInfo = { + def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = { val status = listener.storageStatusList(statusId) val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort @@ -189,7 +194,7 @@ private[ui] class ExecutorsPage( val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty) - new ExecutorSummaryInfo( + new ExecutorSummary( execId, hostPort, rddBlocks, http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index f6abf27..09323d1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -271,6 +271,12 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { val shouldShowCompletedJobs = completedJobs.nonEmpty val shouldShowFailedJobs = failedJobs.nonEmpty + val completedJobNumStr = if (completedJobs.size == listener.numCompletedJobs) { + s"${completedJobs.size}" + } else { + s"${listener.numCompletedJobs}, only showing ${completedJobs.size}" + } + val summary: NodeSeq = <div> <ul class="unstyled"> @@ -295,9 +301,9 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } { if (shouldShowCompletedJobs) { - <li> + <li id="completed-summary"> <a href="#completed"><strong>Completed Jobs:</strong></a> - {completedJobs.size} + {completedJobNumStr} </li> } } @@ -305,7 +311,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { if (shouldShowFailedJobs) { <li> <a href="#failed"><strong>Failed Jobs:</strong></a> - {failedJobs.size} + {listener.numFailedJobs} </li> } } @@ -322,7 +328,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { activeJobsTable } if (shouldShowCompletedJobs) { - content ++= <h4 id="completed">Completed Jobs ({completedJobs.size})</h4> ++ + content ++= <h4 id="completed">Completed Jobs ({completedJobNumStr})</h4> ++ completedJobsTable } if (shouldShowFailedJobs) { http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index 236bc8e..a37f739 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -64,6 +64,12 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val shouldShowCompletedStages = completedStages.nonEmpty val shouldShowFailedStages = failedStages.nonEmpty + val completedStageNumStr = if (numCompletedStages == completedStages.size) { + s"$numCompletedStages" + } else { + s"$numCompletedStages, only showing ${completedStages.size}" + } + val summary: NodeSeq = <div> <ul class="unstyled"> @@ -98,9 +104,9 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { } { if (shouldShowCompletedStages) { - <li> + <li id="completed-summary"> <a href="#completed"><strong>Completed Stages:</strong></a> - {numCompletedStages} + {completedStageNumStr} </li> } } @@ -132,7 +138,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { pendingStagesTable.toNodeSeq } if (shouldShowCompletedStages) { - content ++= <h4 id="completed">Completed Stages ({numCompletedStages})</h4> ++ + content ++= <h4 id="completed">Completed Stages ({completedStageNumStr})</h4> ++ completedStagesTable.toNodeSeq } if (shouldShowFailedStages) { http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 96cc3d7..7163217 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -187,7 +187,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { val jobDataOption = listener.jobIdToData.get(jobId) if (jobDataOption.isEmpty) { val content = - <div> + <div id="no-info"> <p>No information to display for job {jobId}</p> </div> return UIUtils.headerSparkPage( http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 8f9aa9f..246e191 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -74,6 +74,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // JobProgressListener's retention limits. var numCompletedStages = 0 var numFailedStages = 0 + var numCompletedJobs = 0 + var numFailedJobs = 0 // Misc: val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]() @@ -217,10 +219,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { completedJobs += jobData trimJobsIfNecessary(completedJobs) jobData.status = JobExecutionStatus.SUCCEEDED + numCompletedJobs += 1 case JobFailed(exception) => failedJobs += jobData trimJobsIfNecessary(failedJobs) jobData.status = JobExecutionStatus.FAILED + numFailedJobs += 1 } for (stageId <- jobData.stageIds) { stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage => http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index d725b9d..f3e0b38 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.scheduler.{Schedulable, StageInfo} +import org.apache.spark.scheduler.StageInfo import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing specific pool details */ http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 5793100..89d175b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -25,11 +25,11 @@ import scala.xml.{Elem, Node, Unparsed} import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.ui.scope.RDDOperationGraph import org.apache.spark.util.{Utils, Distribution} -import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { @@ -48,14 +48,22 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val stageAttemptId = parameterAttempt.toInt val stageDataOption = progressListener.stageIdToData.get((stageId, stageAttemptId)) - if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) { + val stageHeader = s"Details for Stage $stageId (Attempt $stageAttemptId)" + if (stageDataOption.isEmpty) { + val content = + <div id="no-info"> + <p>No information to display for Stage {stageId} (Attempt {stageAttemptId})</p> + </div> + return UIUtils.headerSparkPage(stageHeader, content, parent) + + } + if (stageDataOption.get.taskData.isEmpty) { val content = <div> <h4>Summary Metrics</h4> No tasks have started yet <h4>Tasks</h4> No tasks have started yet </div> - return UIUtils.headerSparkPage( - s"Details for Stage $stageId (Attempt $stageAttemptId)", content, parent) + return UIUtils.headerSparkPage(stageHeader, content, parent) } val stageData = stageDataOption.get @@ -446,8 +454,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { maybeAccumulableTable ++ <h4>Tasks</h4> ++ taskTable - UIUtils.headerSparkPage( - "Details for Stage %d".format(stageId), content, parent, showVisualization = true) + UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 199f731..05f94a7 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -21,8 +21,8 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils} -import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.status.api.v1.{AllRDDResource, RDDDataDistribution, RDDPartitionInfo} +import org.apache.spark.ui.{UIUtils, WebUIPage} import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ @@ -32,28 +32,19 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { def render(request: HttpServletRequest): Seq[Node] = { val parameterId = request.getParameter("id") require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") - val rddId = parameterId.toInt - val storageStatusList = listener.storageStatusList - val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse { - // Rather than crashing, render an "RDD Not Found" page - return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent) - } + val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener,includeDetails = true) + .getOrElse { + // Rather than crashing, render an "RDD Not Found" page + return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent) + } // Worker table - val workers = storageStatusList.map((rddId, _)) - val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers, - id = Some("rdd-storage-by-worker-table")) + val workerTable = UIUtils.listingTable(workerHeader, workerRow, + rddStorageInfo.dataDistribution.get, id = Some("rdd-storage-by-worker-table")) // Block table - val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList) - val blocks = storageStatusList - .flatMap(_.rddBlocksById(rddId)) - .sortWith(_._1.name < _._1.name) - .map { case (blockId, status) => - (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) - } - val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks, + val blockTable = UIUtils.listingTable(blockHeader, blockRow, rddStorageInfo.partitions.get, id = Some("rdd-storage-by-block-table")) val content = @@ -62,23 +53,23 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { <ul class="unstyled"> <li> <strong>Storage Level:</strong> - {rddInfo.storageLevel.description} + {rddStorageInfo.storageLevel} </li> <li> <strong>Cached Partitions:</strong> - {rddInfo.numCachedPartitions} + {rddStorageInfo.numCachedPartitions} </li> <li> <strong>Total Partitions:</strong> - {rddInfo.numPartitions} + {rddStorageInfo.numPartitions} </li> <li> <strong>Memory Size:</strong> - {Utils.bytesToString(rddInfo.memSize)} + {Utils.bytesToString(rddStorageInfo.memoryUsed)} </li> <li> <strong>Disk Size:</strong> - {Utils.bytesToString(rddInfo.diskSize)} + {Utils.bytesToString(rddStorageInfo.diskUsed)} </li> </ul> </div> @@ -86,19 +77,19 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { <div class="row-fluid"> <div class="span12"> - <h4> Data Distribution on {workers.size} Executors </h4> + <h4> Data Distribution on {rddStorageInfo.dataDistribution.size} Executors </h4> {workerTable} </div> </div> <div class="row-fluid"> <div class="span12"> - <h4> {blocks.size} Partitions </h4> + <h4> {rddStorageInfo.partitions.size} Partitions </h4> {blockTable} </div> </div>; - UIUtils.headerSparkPage("RDD Storage Info for " + rddInfo.name, content, parent) + UIUtils.headerSparkPage("RDD Storage Info for " + rddStorageInfo.name, content, parent) } /** Header fields for the worker table */ @@ -116,34 +107,32 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { "Executors") /** Render an HTML row representing a worker */ - private def workerRow(worker: (Int, StorageStatus)): Seq[Node] = { - val (rddId, status) = worker + private def workerRow(worker: RDDDataDistribution): Seq[Node] = { <tr> - <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td> + <td>{worker.address}</td> <td> - {Utils.bytesToString(status.memUsedByRdd(rddId))} - ({Utils.bytesToString(status.memRemaining)} Remaining) + {Utils.bytesToString(worker.memoryUsed)} + ({Utils.bytesToString(worker.memoryRemaining)} Remaining) </td> - <td>{Utils.bytesToString(status.diskUsedByRdd(rddId))}</td> + <td>{Utils.bytesToString(worker.diskUsed)}</td> </tr> } /** Render an HTML row representing a block */ - private def blockRow(row: (BlockId, BlockStatus, Seq[String])): Seq[Node] = { - val (id, block, locations) = row + private def blockRow(row: RDDPartitionInfo): Seq[Node] = { <tr> - <td>{id}</td> + <td>{row.blockName}</td> <td> - {block.storageLevel.description} + {row.storageLevel} </td> - <td sorttable_customkey={block.memSize.toString}> - {Utils.bytesToString(block.memSize)} + <td sorttable_customkey={row.memoryUsed.toString}> + {Utils.bytesToString(row.memoryUsed)} </td> - <td sorttable_customkey={block.diskSize.toString}> - {Utils.bytesToString(block.diskSize)} + <td sorttable_customkey={row.diskUsed.toString}> + {Utils.bytesToString(row.diskUsed)} </td> <td> - {locations.map(l => <span>{l}<br/></span>)} + {row.executors.map(l => <span>{l}<br/></span>)} </td> </tr> } http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index 59dc6b5..07db783 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.RDDInfo -import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.ui.{UIUtils, WebUIPage} import org.apache.spark.util.Utils /** Page showing list of RDD's currently stored in the cluster */ http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 045bd78..0351749 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -35,6 +35,8 @@ private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storag /** * :: DeveloperApi :: * A SparkListener that prepares information to be displayed on the BlockManagerUI. + * + * This class is thread-safe (unlike JobProgressListener) */ @DeveloperApi class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener { @@ -43,7 +45,9 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList /** Filter RDD info to include only those with cached partitions */ - def rddInfoList: Seq[RDDInfo] = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq + def rddInfoList: Seq[RDDInfo] = synchronized { + _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq + } /** Update the storage info of the RDDs whose blocks are among the given updated blocks */ private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/json_expectation ---------------------------------------------------------------------- diff --git a/core/src/test/resources/HistoryServerExpectations/applications/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/json_expectation new file mode 100644 index 0000000..6101177 --- /dev/null +++ b/core/src/test/resources/HistoryServerExpectations/applications/json_expectation @@ -0,0 +1,53 @@ +[ { + "id" : "local-1427397477963", + "name" : "Spark shell", + "attempts" : [ { + "startTime" : "2015-03-26T19:17:57.184GMT", + "endTime" : "2015-03-26T19:20:02.949GMT", + "sparkUser" : "irashid", + "completed" : true + } ] +}, { + "id" : "local-1426533911241", + "name" : "Spark shell", + "attempts" : [ { + "attemptId" : "2", + "startTime" : "2015-03-17T23:11:50.242GMT", + "endTime" : "2015-03-17T23:12:25.177GMT", + "sparkUser" : "irashid", + "completed" : true + }, { + "attemptId" : "1", + "startTime" : "2015-03-16T19:25:10.242GMT", + "endTime" : "2015-03-16T19:25:45.177GMT", + "sparkUser" : "irashid", + "completed" : true + } ] +}, { + "id" : "local-1425081759269", + "name" : "Spark shell", + "attempts" : [ { + "startTime" : "2015-02-28T00:02:38.277GMT", + "endTime" : "2015-02-28T00:02:46.912GMT", + "sparkUser" : "irashid", + "completed" : true + } ] +}, { + "id" : "local-1422981780767", + "name" : "Spark shell", + "attempts" : [ { + "startTime" : "2015-02-03T16:42:59.720GMT", + "endTime" : "2015-02-03T16:43:08.731GMT", + "sparkUser" : "irashid", + "completed" : true + } ] +}, { + "id" : "local-1422981759269", + "name" : "Spark shell", + "attempts" : [ { + "startTime" : "2015-02-03T16:42:38.277GMT", + "endTime" : "2015-02-03T16:42:46.912GMT", + "sparkUser" : "irashid", + "completed" : true + } ] +} ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/executors/json_expectation ---------------------------------------------------------------------- diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/executors/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/executors/json_expectation new file mode 100644 index 0000000..cb622e1 --- /dev/null +++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/executors/json_expectation @@ -0,0 +1,17 @@ +[ { + "id" : "<driver>", + "hostPort" : "localhost:57971", + "rddBlocks" : 8, + "memoryUsed" : 28000128, + "diskUsed" : 0, + "activeTasks" : 0, + "failedTasks" : 1, + "completedTasks" : 31, + "totalTasks" : 32, + "totalDuration" : 8820, + "totalInputBytes" : 28000288, + "totalShuffleRead" : 0, + "totalShuffleWrite" : 13180, + "maxMemory" : 278302556, + "executorLogs" : { } +} ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/0/json_expectation ---------------------------------------------------------------------- diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/0/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/0/json_expectation new file mode 100644 index 0000000..4a29072 --- /dev/null +++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/0/json_expectation @@ -0,0 +1,15 @@ +{ + "jobId" : 0, + "name" : "count at <console>:15", + "stageIds" : [ 0 ], + "status" : "SUCCEEDED", + "numTasks" : 8, + "numActiveTasks" : 0, + "numCompletedTasks" : 8, + "numSkippedTasks" : 8, + "numFailedTasks" : 0, + "numActiveStages" : 0, + "numCompletedStages" : 1, + "numSkippedStages" : 0, + "numFailedStages" : 0 +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/json_expectation ---------------------------------------------------------------------- diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/json_expectation new file mode 100644 index 0000000..cab4750 --- /dev/null +++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/json_expectation @@ -0,0 +1,43 @@ +[ { + "jobId" : 2, + "name" : "count at <console>:17", + "stageIds" : [ 3 ], + "status" : "SUCCEEDED", + "numTasks" : 8, + "numActiveTasks" : 0, + "numCompletedTasks" : 8, + "numSkippedTasks" : 8, + "numFailedTasks" : 0, + "numActiveStages" : 0, + "numCompletedStages" : 1, + "numSkippedStages" : 0, + "numFailedStages" : 0 +}, { + "jobId" : 1, + "name" : "count at <console>:20", + "stageIds" : [ 1, 2 ], + "status" : "FAILED", + "numTasks" : 16, + "numActiveTasks" : 0, + "numCompletedTasks" : 15, + "numSkippedTasks" : 15, + "numFailedTasks" : 1, + "numActiveStages" : 0, + "numCompletedStages" : 1, + "numSkippedStages" : 0, + "numFailedStages" : 1 +}, { + "jobId" : 0, + "name" : "count at <console>:15", + "stageIds" : [ 0 ], + "status" : "SUCCEEDED", + "numTasks" : 8, + "numActiveTasks" : 0, + "numCompletedTasks" : 8, + "numSkippedTasks" : 8, + "numFailedTasks" : 0, + "numActiveStages" : 0, + "numCompletedStages" : 1, + "numSkippedStages" : 0, + "numFailedStages" : 0 +} ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded&status=failed/json_expectation ---------------------------------------------------------------------- diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded&status=failed/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded&status=failed/json_expectation new file mode 100644 index 0000000..cab4750 --- /dev/null +++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded&status=failed/json_expectation @@ -0,0 +1,43 @@ +[ { + "jobId" : 2, + "name" : "count at <console>:17", + "stageIds" : [ 3 ], + "status" : "SUCCEEDED", + "numTasks" : 8, + "numActiveTasks" : 0, + "numCompletedTasks" : 8, + "numSkippedTasks" : 8, + "numFailedTasks" : 0, + "numActiveStages" : 0, + "numCompletedStages" : 1, + "numSkippedStages" : 0, + "numFailedStages" : 0 +}, { + "jobId" : 1, + "name" : "count at <console>:20", + "stageIds" : [ 1, 2 ], + "status" : "FAILED", + "numTasks" : 16, + "numActiveTasks" : 0, + "numCompletedTasks" : 15, + "numSkippedTasks" : 15, + "numFailedTasks" : 1, + "numActiveStages" : 0, + "numCompletedStages" : 1, + "numSkippedStages" : 0, + "numFailedStages" : 1 +}, { + "jobId" : 0, + "name" : "count at <console>:15", + "stageIds" : [ 0 ], + "status" : "SUCCEEDED", + "numTasks" : 8, + "numActiveTasks" : 0, + "numCompletedTasks" : 8, + "numSkippedTasks" : 8, + "numFailedTasks" : 0, + "numActiveStages" : 0, + "numCompletedStages" : 1, + "numSkippedStages" : 0, + "numFailedStages" : 0 +} ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded/json_expectation ---------------------------------------------------------------------- diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded/json_expectation new file mode 100644 index 0000000..6fd25be --- /dev/null +++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded/json_expectation @@ -0,0 +1,29 @@ +[ { + "jobId" : 2, + "name" : "count at <console>:17", + "stageIds" : [ 3 ], + "status" : "SUCCEEDED", + "numTasks" : 8, + "numActiveTasks" : 0, + "numCompletedTasks" : 8, + "numSkippedTasks" : 8, + "numFailedTasks" : 0, + "numActiveStages" : 0, + "numCompletedStages" : 1, + "numSkippedStages" : 0, + "numFailedStages" : 0 +}, { + "jobId" : 0, + "name" : "count at <console>:15", + "stageIds" : [ 0 ], + "status" : "SUCCEEDED", + "numTasks" : 8, + "numActiveTasks" : 0, + "numCompletedTasks" : 8, + "numSkippedTasks" : 8, + "numFailedTasks" : 0, + "numActiveStages" : 0, + "numCompletedStages" : 1, + "numSkippedStages" : 0, + "numFailedStages" : 0 +} ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/json_expectation ---------------------------------------------------------------------- diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/json_expectation new file mode 100644 index 0000000..07489ad --- /dev/null +++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/json_expectation @@ -0,0 +1,10 @@ +{ + "id" : "local-1422981780767", + "name" : "Spark shell", + "attempts" : [ { + "startTime" : "2015-02-03T16:42:59.720GMT", + "endTime" : "2015-02-03T16:43:08.731GMT", + "sparkUser" : "irashid", + "completed" : true + } ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/0/json_expectation ---------------------------------------------------------------------- diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/0/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/0/json_expectation new file mode 100644 index 0000000..111cb81 --- /dev/null +++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/0/json_expectation @@ -0,0 +1,270 @@ +{ + "status" : "COMPLETE", + "stageId" : 1, + "attemptId" : 0, + "numActiveTasks" : 0, + "numCompleteTasks" : 8, + "numFailedTasks" : 0, + "executorRunTime" : 3476, + "inputBytes" : 28000128, + "inputRecords" : 0, + "outputBytes" : 0, + "outputRecords" : 0, + "shuffleReadBytes" : 0, + "shuffleReadRecords" : 0, + "shuffleWriteBytes" : 13180, + "shuffleWriteRecords" : 0, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "name" : "map at <console>:14", + "details" : "org.apache.spark.rdd.RDD.map(RDD.scala:271)\n$line10.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:14)\n$line10.$read$$iwC$$iwC$$iwC.<init>(<console>:19)\n$line10.$read$$iwC$$iwC.<init>(<console>:21)\n$line10.$read$$iwC.<init>(<console>:23)\n$line10.$read.<init>(<console>:25)\n$line10.$read$.<init>(<console>:29)\n$line10.$read$.<clinit>(<console>)\n$line10.$eval$.<init>(<console>:7)\n$line10.$eval$.<clinit>(<console>)\n$line10.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIM ain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)", + "schedulingPool" : "default", + "accumulatorUpdates" : [ ], + "tasks" : { + "8" : { + "taskId" : 8, + "index" : 0, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.829GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 1, + "executorRunTime" : 435, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 2, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1648, + "writeTime" : 94000, + "recordsWritten" : 0 + } + } + }, + "11" : { + "taskId" : 11, + "index" : 3, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.830GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 2, + "executorRunTime" : 434, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 1, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1647, + "writeTime" : 83000, + "recordsWritten" : 0 + } + } + }, + "14" : { + "taskId" : 14, + "index" : 6, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.832GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 2, + "executorRunTime" : 434, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 1, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1648, + "writeTime" : 88000, + "recordsWritten" : 0 + } + } + }, + "13" : { + "taskId" : 13, + "index" : 5, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.831GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 2, + "executorRunTime" : 434, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 2, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1648, + "writeTime" : 73000, + "recordsWritten" : 0 + } + } + }, + "10" : { + "taskId" : 10, + "index" : 2, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.830GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 2, + "executorRunTime" : 434, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 1, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1648, + "writeTime" : 76000, + "recordsWritten" : 0 + } + } + }, + "9" : { + "taskId" : 9, + "index" : 1, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.830GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 1, + "executorRunTime" : 436, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 0, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1648, + "writeTime" : 98000, + "recordsWritten" : 0 + } + } + }, + "12" : { + "taskId" : 12, + "index" : 4, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.831GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 2, + "executorRunTime" : 434, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 1, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1645, + "writeTime" : 101000, + "recordsWritten" : 0 + } + } + }, + "15" : { + "taskId" : 15, + "index" : 7, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.833GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 1, + "executorRunTime" : 435, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 1, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1648, + "writeTime" : 79000, + "recordsWritten" : 0 + } + } + } + }, + "executorSummary" : { + "<driver>" : { + "taskTime" : 3624, + "failedTasks" : 0, + "succeededTasks" : 8, + "inputBytes" : 28000128, + "outputBytes" : 0, + "shuffleRead" : 0, + "shuffleWrite" : 13180, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0 + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/json_expectation ---------------------------------------------------------------------- diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/json_expectation new file mode 100644 index 0000000..ef339f8 --- /dev/null +++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/1/json_expectation @@ -0,0 +1,270 @@ +[ { + "status" : "COMPLETE", + "stageId" : 1, + "attemptId" : 0, + "numActiveTasks" : 0, + "numCompleteTasks" : 8, + "numFailedTasks" : 0, + "executorRunTime" : 3476, + "inputBytes" : 28000128, + "inputRecords" : 0, + "outputBytes" : 0, + "outputRecords" : 0, + "shuffleReadBytes" : 0, + "shuffleReadRecords" : 0, + "shuffleWriteBytes" : 13180, + "shuffleWriteRecords" : 0, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "name" : "map at <console>:14", + "details" : "org.apache.spark.rdd.RDD.map(RDD.scala:271)\n$line10.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:14)\n$line10.$read$$iwC$$iwC$$iwC.<init>(<console>:19)\n$line10.$read$$iwC$$iwC.<init>(<console>:21)\n$line10.$read$$iwC.<init>(<console>:23)\n$line10.$read.<init>(<console>:25)\n$line10.$read$.<init>(<console>:29)\n$line10.$read$.<clinit>(<console>)\n$line10.$eval$.<init>(<console>:7)\n$line10.$eval$.<clinit>(<console>)\n$line10.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIM ain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)", + "schedulingPool" : "default", + "accumulatorUpdates" : [ ], + "tasks" : { + "8" : { + "taskId" : 8, + "index" : 0, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.829GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 1, + "executorRunTime" : 435, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 2, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1648, + "writeTime" : 94000, + "recordsWritten" : 0 + } + } + }, + "11" : { + "taskId" : 11, + "index" : 3, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.830GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 2, + "executorRunTime" : 434, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 1, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1647, + "writeTime" : 83000, + "recordsWritten" : 0 + } + } + }, + "14" : { + "taskId" : 14, + "index" : 6, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.832GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 2, + "executorRunTime" : 434, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 1, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1648, + "writeTime" : 88000, + "recordsWritten" : 0 + } + } + }, + "13" : { + "taskId" : 13, + "index" : 5, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.831GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 2, + "executorRunTime" : 434, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 2, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1648, + "writeTime" : 73000, + "recordsWritten" : 0 + } + } + }, + "10" : { + "taskId" : 10, + "index" : 2, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.830GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 2, + "executorRunTime" : 434, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 1, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1648, + "writeTime" : 76000, + "recordsWritten" : 0 + } + } + }, + "9" : { + "taskId" : 9, + "index" : 1, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.830GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 1, + "executorRunTime" : 436, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 0, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1648, + "writeTime" : 98000, + "recordsWritten" : 0 + } + } + }, + "12" : { + "taskId" : 12, + "index" : 4, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.831GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 2, + "executorRunTime" : 434, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 1, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1645, + "writeTime" : 101000, + "recordsWritten" : 0 + } + } + }, + "15" : { + "taskId" : 15, + "index" : 7, + "attempt" : 0, + "launchTime" : "2015-02-03T16:43:05.833GMT", + "executorId" : "<driver>", + "host" : "localhost", + "taskLocality" : "PROCESS_LOCAL", + "speculative" : false, + "accumulatorUpdates" : [ ], + "taskMetrics" : { + "executorDeserializeTime" : 1, + "executorRunTime" : 435, + "resultSize" : 1902, + "jvmGcTime" : 19, + "resultSerializationTime" : 1, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "inputMetrics" : { + "bytesRead" : 3500016, + "recordsRead" : 0 + }, + "shuffleWriteMetrics" : { + "bytesWritten" : 1648, + "writeTime" : 79000, + "recordsWritten" : 0 + } + } + } + }, + "executorSummary" : { + "<driver>" : { + "taskTime" : 3624, + "failedTasks" : 0, + "succeededTasks" : 8, + "inputBytes" : 28000128, + "outputBytes" : 0, + "shuffleRead" : 0, + "shuffleWrite" : 13180, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0 + } + } +} ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/json_expectation ---------------------------------------------------------------------- diff --git a/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/json_expectation b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/json_expectation new file mode 100644 index 0000000..056fac7 --- /dev/null +++ b/core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/stages/json_expectation @@ -0,0 +1,89 @@ +[ { + "status" : "COMPLETE", + "stageId" : 3, + "attemptId" : 0, + "numActiveTasks" : 0, + "numCompleteTasks" : 8, + "numFailedTasks" : 0, + "executorRunTime" : 162, + "inputBytes" : 160, + "inputRecords" : 0, + "outputBytes" : 0, + "outputRecords" : 0, + "shuffleReadBytes" : 0, + "shuffleReadRecords" : 0, + "shuffleWriteBytes" : 0, + "shuffleWriteRecords" : 0, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "name" : "count at <console>:17", + "details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line19.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:17)\n$line19.$read$$iwC$$iwC$$iwC.<init>(<console>:22)\n$line19.$read$$iwC$$iwC.<init>(<console>:24)\n$line19.$read$$iwC.<init>(<console>:26)\n$line19.$read.<init>(<console>:28)\n$line19.$read$.<init>(<console>:32)\n$line19.$read$.<clinit>(<console>)\n$line19.$eval$.<init>(<console>:7)\n$line19.$eval$.<clinit>(<console>)\n$line19.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.Spark IMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)", + "schedulingPool" : "default", + "accumulatorUpdates" : [ ] +}, { + "status" : "COMPLETE", + "stageId" : 1, + "attemptId" : 0, + "numActiveTasks" : 0, + "numCompleteTasks" : 8, + "numFailedTasks" : 0, + "executorRunTime" : 3476, + "inputBytes" : 28000128, + "inputRecords" : 0, + "outputBytes" : 0, + "outputRecords" : 0, + "shuffleReadBytes" : 0, + "shuffleReadRecords" : 0, + "shuffleWriteBytes" : 13180, + "shuffleWriteRecords" : 0, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "name" : "map at <console>:14", + "details" : "org.apache.spark.rdd.RDD.map(RDD.scala:271)\n$line10.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:14)\n$line10.$read$$iwC$$iwC$$iwC.<init>(<console>:19)\n$line10.$read$$iwC$$iwC.<init>(<console>:21)\n$line10.$read$$iwC.<init>(<console>:23)\n$line10.$read.<init>(<console>:25)\n$line10.$read$.<init>(<console>:29)\n$line10.$read$.<clinit>(<console>)\n$line10.$eval$.<init>(<console>:7)\n$line10.$eval$.<clinit>(<console>)\n$line10.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIM ain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)", + "schedulingPool" : "default", + "accumulatorUpdates" : [ ] +}, { + "status" : "COMPLETE", + "stageId" : 0, + "attemptId" : 0, + "numActiveTasks" : 0, + "numCompleteTasks" : 8, + "numFailedTasks" : 0, + "executorRunTime" : 4338, + "inputBytes" : 0, + "inputRecords" : 0, + "outputBytes" : 0, + "outputRecords" : 0, + "shuffleReadBytes" : 0, + "shuffleReadRecords" : 0, + "shuffleWriteBytes" : 0, + "shuffleWriteRecords" : 0, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "name" : "count at <console>:15", + "details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line9.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:15)\n$line9.$read$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line9.$read$$iwC$$iwC.<init>(<console>:22)\n$line9.$read$$iwC.<init>(<console>:24)\n$line9.$read.<init>(<console>:26)\n$line9.$read$.<init>(<console>:30)\n$line9.$read$.<clinit>(<console>)\n$line9.$eval$.<init>(<console>:7)\n$line9.$eval$.<clinit>(<console>)\n$line9.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.inte rpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)", + "schedulingPool" : "default", + "accumulatorUpdates" : [ ] +}, { + "status" : "FAILED", + "stageId" : 2, + "attemptId" : 0, + "numActiveTasks" : 0, + "numCompleteTasks" : 7, + "numFailedTasks" : 1, + "executorRunTime" : 278, + "inputBytes" : 0, + "inputRecords" : 0, + "outputBytes" : 0, + "outputRecords" : 0, + "shuffleReadBytes" : 0, + "shuffleReadRecords" : 0, + "shuffleWriteBytes" : 0, + "shuffleWriteRecords" : 0, + "memoryBytesSpilled" : 0, + "diskBytesSpilled" : 0, + "name" : "count at <console>:20", + "details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line11.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line11.$read$$iwC$$iwC$$iwC.<init>(<console>:25)\n$line11.$read$$iwC$$iwC.<init>(<console>:27)\n$line11.$read$$iwC.<init>(<console>:29)\n$line11.$read.<init>(<console>:31)\n$line11.$read$.<init>(<console>:35)\n$line11.$read$.<clinit>(<console>)\n$line11.$eval$.<init>(<console>:7)\n$line11.$eval$.<clinit>(<console>)\n$line11.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.Spark IMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)", + "schedulingPool" : "default", + "accumulatorUpdates" : [ ] +} ] \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
