Repository: spark Updated Branches: refs/heads/master bc0848b4c -> 39b3f10dd
[SPARK-20649][CORE] Simplify REST API resource structure. With the new UI store, the API resource classes have a lot less code, since there's no need for complicated translations between the UI types and the API types. So the code ended up with a bunch of files with a single method declared in them. This change re-structures the API code so that it uses less classes; mainly, most sub-resources were removed, and the code to deal with single-attempt and multi-attempt apps was simplified. The only change was the addition of a method to return a single attempt's information; that was missing in the old API, so trying to retrieve "/v1/applications/appId/attemptId" would result in a 404 even if the attempt existed (and URIs under that one would return valid data). The streaming API resources also overtook the same treatment, even though the data is not stored in the new UI store. Author: Marcelo Vanzin <[email protected]> Closes #19748 from vanzin/SPARK-20649. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39b3f10d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39b3f10d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39b3f10d Branch: refs/heads/master Commit: 39b3f10dda73f4a1f735f17467e5c6c45c44e977 Parents: bc0848b Author: Marcelo Vanzin <[email protected]> Authored: Wed Nov 15 15:41:53 2017 -0600 Committer: Imran Rashid <[email protected]> Committed: Wed Nov 15 15:41:53 2017 -0600 ---------------------------------------------------------------------- .../status/api/v1/AllExecutorListResource.scala | 30 --- .../spark/status/api/v1/AllJobsResource.scala | 35 ---- .../spark/status/api/v1/AllRDDResource.scala | 31 --- .../spark/status/api/v1/AllStagesResource.scala | 33 --- .../spark/status/api/v1/ApiRootResource.scala | 203 ++----------------- .../api/v1/ApplicationEnvironmentResource.scala | 32 --- .../status/api/v1/ApplicationListResource.scala | 2 +- .../api/v1/EventLogDownloadResource.scala | 71 ------- .../status/api/v1/ExecutorListResource.scala | 30 --- .../status/api/v1/OneApplicationResource.scala | 146 ++++++++++++- .../spark/status/api/v1/OneJobResource.scala | 38 ---- .../spark/status/api/v1/OneRDDResource.scala | 38 ---- .../spark/status/api/v1/OneStageResource.scala | 89 -------- .../spark/status/api/v1/StagesResource.scala | 97 +++++++++ .../spark/status/api/v1/VersionResource.scala | 30 --- .../api/v1/streaming/AllBatchesResource.scala | 78 ------- .../streaming/AllOutputOperationsResource.scala | 66 ------ .../api/v1/streaming/AllReceiversResource.scala | 76 ------- .../api/v1/streaming/ApiStreamingApp.scala | 31 ++- .../v1/streaming/ApiStreamingRootResource.scala | 172 +++++++++++++--- .../api/v1/streaming/OneBatchResource.scala | 35 ---- .../streaming/OneOutputOperationResource.scala | 39 ---- .../api/v1/streaming/OneReceiverResource.scala | 35 ---- .../streaming/StreamingStatisticsResource.scala | 64 ------ 24 files changed, 425 insertions(+), 1076 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala deleted file mode 100644 index 5522f4c..0000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllExecutorListResource.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllExecutorListResource(ui: SparkUI) { - - @GET - def executorList(): Seq[ExecutorSummary] = ui.store.executorList(false) - -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala deleted file mode 100644 index b4fa3e6..0000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.util.{Arrays, Date, List => JList} -import javax.ws.rs._ -import javax.ws.rs.core.MediaType - -import org.apache.spark.JobExecutionStatus -import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.jobs.UIData.JobUIData - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllJobsResource(ui: SparkUI) { - - @GET - def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = { - ui.store.jobsList(statuses) - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala deleted file mode 100644 index 2189e1d..0000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils} -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllRDDResource(ui: SparkUI) { - - @GET - def rddList(): Seq[RDDStorageInfo] = ui.store.rddList() - -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala deleted file mode 100644 index e1c91cb..0000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.util.{List => JList} -import javax.ws.rs.{GET, Produces, QueryParam} -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllStagesResource(ui: SparkUI) { - - @GET - def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = { - ui.store.stageList(statuses) - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index 9d38330..ed9bdc6 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -44,189 +44,14 @@ import org.apache.spark.ui.SparkUI private[v1] class ApiRootResource extends ApiRequestContext { @Path("applications") - def getApplicationList(): ApplicationListResource = { - new ApplicationListResource(uiRoot) - } + def applicationList(): Class[ApplicationListResource] = classOf[ApplicationListResource] @Path("applications/{appId}") - def getApplication(): OneApplicationResource = { - new OneApplicationResource(uiRoot) - } - - @Path("applications/{appId}/{attemptId}/jobs") - def getJobs( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): AllJobsResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new AllJobsResource(ui) - } - } - - @Path("applications/{appId}/jobs") - def getJobs(@PathParam("appId") appId: String): AllJobsResource = { - withSparkUI(appId, None) { ui => - new AllJobsResource(ui) - } - } - - @Path("applications/{appId}/jobs/{jobId: \\d+}") - def getJob(@PathParam("appId") appId: String): OneJobResource = { - withSparkUI(appId, None) { ui => - new OneJobResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/jobs/{jobId: \\d+}") - def getJob( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): OneJobResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new OneJobResource(ui) - } - } - - @Path("applications/{appId}/executors") - def getExecutors(@PathParam("appId") appId: String): ExecutorListResource = { - withSparkUI(appId, None) { ui => - new ExecutorListResource(ui) - } - } - - @Path("applications/{appId}/allexecutors") - def getAllExecutors(@PathParam("appId") appId: String): AllExecutorListResource = { - withSparkUI(appId, None) { ui => - new AllExecutorListResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/executors") - def getExecutors( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): ExecutorListResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new ExecutorListResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/allexecutors") - def getAllExecutors( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): AllExecutorListResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new AllExecutorListResource(ui) - } - } - - @Path("applications/{appId}/stages") - def getStages(@PathParam("appId") appId: String): AllStagesResource = { - withSparkUI(appId, None) { ui => - new AllStagesResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/stages") - def getStages( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): AllStagesResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new AllStagesResource(ui) - } - } - - @Path("applications/{appId}/stages/{stageId: \\d+}") - def getStage(@PathParam("appId") appId: String): OneStageResource = { - withSparkUI(appId, None) { ui => - new OneStageResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/stages/{stageId: \\d+}") - def getStage( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): OneStageResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new OneStageResource(ui) - } - } - - @Path("applications/{appId}/storage/rdd") - def getRdds(@PathParam("appId") appId: String): AllRDDResource = { - withSparkUI(appId, None) { ui => - new AllRDDResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/storage/rdd") - def getRdds( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): AllRDDResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new AllRDDResource(ui) - } - } - - @Path("applications/{appId}/storage/rdd/{rddId: \\d+}") - def getRdd(@PathParam("appId") appId: String): OneRDDResource = { - withSparkUI(appId, None) { ui => - new OneRDDResource(ui) - } - } - - @Path("applications/{appId}/{attemptId}/storage/rdd/{rddId: \\d+}") - def getRdd( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): OneRDDResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new OneRDDResource(ui) - } - } - - @Path("applications/{appId}/logs") - def getEventLogs( - @PathParam("appId") appId: String): EventLogDownloadResource = { - try { - // withSparkUI will throw NotFoundException if attemptId exists for this application. - // So we need to try again with attempt id "1". - withSparkUI(appId, None) { _ => - new EventLogDownloadResource(uiRoot, appId, None) - } - } catch { - case _: NotFoundException => - withSparkUI(appId, Some("1")) { _ => - new EventLogDownloadResource(uiRoot, appId, None) - } - } - } - - @Path("applications/{appId}/{attemptId}/logs") - def getEventLogs( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): EventLogDownloadResource = { - withSparkUI(appId, Some(attemptId)) { _ => - new EventLogDownloadResource(uiRoot, appId, Some(attemptId)) - } - } + def application(): Class[OneApplicationResource] = classOf[OneApplicationResource] @Path("version") - def getVersion(): VersionResource = { - new VersionResource(uiRoot) - } - - @Path("applications/{appId}/environment") - def getEnvironment(@PathParam("appId") appId: String): ApplicationEnvironmentResource = { - withSparkUI(appId, None) { ui => - new ApplicationEnvironmentResource(ui) - } - } + def version(): VersionInfo = new VersionInfo(org.apache.spark.SPARK_VERSION) - @Path("applications/{appId}/{attemptId}/environment") - def getEnvironment( - @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): ApplicationEnvironmentResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new ApplicationEnvironmentResource(ui) - } - } } private[spark] object ApiRootResource { @@ -293,23 +118,29 @@ private[v1] trait ApiRequestContext { def uiRoot: UIRoot = UIRootFromServletContext.getUiRoot(servletContext) +} - /** - * Get the spark UI with the given appID, and apply a function - * to it. If there is no such app, throw an appropriate exception - */ - def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => T): T = { +/** + * Base class for resource handlers that use app-specific data. Abstracts away dealing with + * application and attempt IDs, and finding the app's UI. + */ +private[v1] trait BaseAppResource extends ApiRequestContext { + + @PathParam("appId") protected[this] var appId: String = _ + @PathParam("attemptId") protected[this] var attemptId: String = _ + + protected def withUI[T](fn: SparkUI => T): T = { try { - uiRoot.withSparkUI(appId, attemptId) { ui => + uiRoot.withSparkUI(appId, Option(attemptId)) { ui => val user = httpRequest.getRemoteUser() if (!ui.securityManager.checkUIViewPermissions(user)) { throw new ForbiddenException(raw"""user "$user" is not authorized""") } - f(ui) + fn(ui) } } catch { case _: NoSuchElementException => - val appKey = attemptId.map(appId + "/" + _).getOrElse(appId) + val appKey = Option(attemptId).map(appId + "/" + _).getOrElse(appId) throw new NotFoundException(s"no such app: $appKey") } } http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala deleted file mode 100644 index e702f8a..0000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationEnvironmentResource.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs._ -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class ApplicationEnvironmentResource(ui: SparkUI) { - - @GET - def getEnvironmentInfo(): ApplicationEnvironmentInfo = { - ui.store.environmentInfo() - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index f039744..91660a5 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -23,7 +23,7 @@ import javax.ws.rs.core.MediaType import org.apache.spark.deploy.history.ApplicationHistoryInfo @Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class ApplicationListResource(uiRoot: UIRoot) { +private[v1] class ApplicationListResource extends ApiRequestContext { @GET def appList( http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala deleted file mode 100644 index c84022d..0000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.io.OutputStream -import java.util.zip.ZipOutputStream -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.{MediaType, Response, StreamingOutput} - -import scala.util.control.NonFatal - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging - -@Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) -private[v1] class EventLogDownloadResource( - val uIRoot: UIRoot, - val appId: String, - val attemptId: Option[String]) extends Logging { - val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf) - - @GET - def getEventLogs(): Response = { - try { - val fileName = { - attemptId match { - case Some(id) => s"eventLogs-$appId-$id.zip" - case None => s"eventLogs-$appId.zip" - } - } - - val stream = new StreamingOutput { - override def write(output: OutputStream): Unit = { - val zipStream = new ZipOutputStream(output) - try { - uIRoot.writeEventLogs(appId, attemptId, zipStream) - } finally { - zipStream.close() - } - - } - } - - Response.ok(stream) - .header("Content-Disposition", s"attachment; filename=$fileName") - .header("Content-Type", MediaType.APPLICATION_OCTET_STREAM) - .build() - } catch { - case NonFatal(e) => - Response.serverError() - .entity(s"Event logs are not available for app: $appId.") - .status(Response.Status.SERVICE_UNAVAILABLE) - .build() - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala deleted file mode 100644 index 975101c..0000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class ExecutorListResource(ui: SparkUI) { - - @GET - def executorList(): Seq[ExecutorSummary] = ui.store.executorList(true) - -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala index 18c3e2f..bd4df07 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala @@ -16,16 +16,150 @@ */ package org.apache.spark.status.api.v1 -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType +import java.io.OutputStream +import java.util.{List => JList} +import java.util.zip.ZipOutputStream +import javax.ws.rs.{GET, Path, PathParam, Produces, QueryParam} +import javax.ws.rs.core.{MediaType, Response, StreamingOutput} + +import scala.util.control.NonFatal + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.ui.SparkUI @Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneApplicationResource(uiRoot: UIRoot) { +private[v1] class AbstractApplicationResource extends BaseAppResource { + + @GET + @Path("jobs") + def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = { + withUI(_.store.jobsList(statuses)) + } + + @GET + @Path("jobs/{jobId: \\d+}") + def oneJob(@PathParam("jobId") jobId: Int): JobData = withUI { ui => + try { + ui.store.job(jobId) + } catch { + case _: NoSuchElementException => + throw new NotFoundException("unknown job: " + jobId) + } + } + + @GET + @Path("executors") + def executorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(true)) + + @GET + @Path("allexecutors") + def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false)) + + @Path("stages") + def stages(): Class[StagesResource] = classOf[StagesResource] + + @GET + @Path("storage/rdd") + def rddList(): Seq[RDDStorageInfo] = withUI(_.store.rddList()) + + @GET + @Path("storage/rdd/{rddId: \\d+}") + def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = withUI { ui => + try { + ui.store.rdd(rddId) + } catch { + case _: NoSuchElementException => + throw new NotFoundException(s"no rdd found w/ id $rddId") + } + } + + @GET + @Path("environment") + def environmentInfo(): ApplicationEnvironmentInfo = withUI(_.store.environmentInfo()) + + @GET + @Path("logs") + @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) + def getEventLogs(): Response = { + // Retrieve the UI for the application just to do access permission checks. For backwards + // compatibility, this code also tries with attemptId "1" if the UI without an attempt ID does + // not exist. + try { + withUI { _ => } + } catch { + case _: NotFoundException if attemptId == null => + attemptId = "1" + withUI { _ => } + attemptId = null + } + + try { + val fileName = if (attemptId != null) { + s"eventLogs-$appId-$attemptId.zip" + } else { + s"eventLogs-$appId.zip" + } + + val stream = new StreamingOutput { + override def write(output: OutputStream): Unit = { + val zipStream = new ZipOutputStream(output) + try { + uiRoot.writeEventLogs(appId, Option(attemptId), zipStream) + } finally { + zipStream.close() + } + + } + } + + Response.ok(stream) + .header("Content-Disposition", s"attachment; filename=$fileName") + .header("Content-Type", MediaType.APPLICATION_OCTET_STREAM) + .build() + } catch { + case NonFatal(e) => + Response.serverError() + .entity(s"Event logs are not available for app: $appId.") + .status(Response.Status.SERVICE_UNAVAILABLE) + .build() + } + } + + /** + * This method needs to be last, otherwise it clashes with the paths for the above methods + * and causes JAX-RS to not find things. + */ + @Path("{attemptId}") + def applicationAttempt(): Class[OneApplicationAttemptResource] = { + if (attemptId != null) { + throw new NotFoundException(httpRequest.getRequestURI()) + } + classOf[OneApplicationAttemptResource] + } + +} + +private[v1] class OneApplicationResource extends AbstractApplicationResource { + + @GET + def getApp(): ApplicationInfo = { + val app = uiRoot.getApplicationInfo(appId) + app.getOrElse(throw new NotFoundException("unknown app: " + appId)) + } + +} + +private[v1] class OneApplicationAttemptResource extends AbstractApplicationResource { @GET - def getApp(@PathParam("appId") appId: String): ApplicationInfo = { - val apps = uiRoot.getApplicationInfo(appId) - apps.getOrElse(throw new NotFoundException("unknown app: " + appId)) + def getAttempt(): ApplicationAttemptInfo = { + uiRoot.getApplicationInfo(appId) + .flatMap { app => + app.attempts.filter(_.attemptId == attemptId).headOption + } + .getOrElse { + throw new NotFoundException(s"unknown app $appId, attempt $attemptId") + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala deleted file mode 100644 index 3ee884e..0000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.util.NoSuchElementException -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneJobResource(ui: SparkUI) { - - @GET - def oneJob(@PathParam("jobId") jobId: Int): JobData = { - try { - ui.store.job(jobId) - } catch { - case _: NoSuchElementException => - throw new NotFoundException("unknown job: " + jobId) - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala deleted file mode 100644 index ca9758c..0000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import java.util.NoSuchElementException -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.ui.SparkUI - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneRDDResource(ui: SparkUI) { - - @GET - def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = { - try { - ui.store.rdd(rddId) - } catch { - case _: NoSuchElementException => - throw new NotFoundException(s"no rdd found w/ id $rddId") - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala deleted file mode 100644 index 20dd73e..0000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs._ -import javax.ws.rs.core.MediaType - -import org.apache.spark.SparkException -import org.apache.spark.scheduler.StageInfo -import org.apache.spark.status.api.v1.StageStatus._ -import org.apache.spark.status.api.v1.TaskSorting._ -import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.jobs.UIData.StageUIData - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneStageResource(ui: SparkUI) { - - @GET - @Path("") - def stageData( - @PathParam("stageId") stageId: Int, - @QueryParam("details") @DefaultValue("true") details: Boolean): Seq[StageData] = { - val ret = ui.store.stageData(stageId, details = details) - if (ret.nonEmpty) { - ret - } else { - throw new NotFoundException(s"unknown stage: $stageId") - } - } - - @GET - @Path("/{stageAttemptId: \\d+}") - def oneAttemptData( - @PathParam("stageId") stageId: Int, - @PathParam("stageAttemptId") stageAttemptId: Int, - @QueryParam("details") @DefaultValue("true") details: Boolean): StageData = { - try { - ui.store.stageAttempt(stageId, stageAttemptId, details = details) - } catch { - case _: NoSuchElementException => - throw new NotFoundException(s"unknown attempt $stageAttemptId for stage $stageId.") - } - } - - @GET - @Path("/{stageAttemptId: \\d+}/taskSummary") - def taskSummary( - @PathParam("stageId") stageId: Int, - @PathParam("stageAttemptId") stageAttemptId: Int, - @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String) - : TaskMetricDistributions = { - val quantiles = quantileString.split(",").map { s => - try { - s.toDouble - } catch { - case nfe: NumberFormatException => - throw new BadParameterException("quantiles", "double", s) - } - } - - ui.store.taskSummary(stageId, stageAttemptId, quantiles) - } - - @GET - @Path("/{stageAttemptId: \\d+}/taskList") - def taskList( - @PathParam("stageId") stageId: Int, - @PathParam("stageAttemptId") stageAttemptId: Int, - @DefaultValue("0") @QueryParam("offset") offset: Int, - @DefaultValue("20") @QueryParam("length") length: Int, - @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { - ui.store.taskList(stageId, stageAttemptId, offset, length, sortBy) - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala new file mode 100644 index 0000000..bd4dfe3 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import java.util.{List => JList} +import javax.ws.rs._ +import javax.ws.rs.core.MediaType + +import org.apache.spark.SparkException +import org.apache.spark.scheduler.StageInfo +import org.apache.spark.status.api.v1.StageStatus._ +import org.apache.spark.status.api.v1.TaskSorting._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.jobs.UIData.StageUIData + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class StagesResource extends BaseAppResource { + + @GET + def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = { + withUI(_.store.stageList(statuses)) + } + + @GET + @Path("{stageId: \\d+}") + def stageData( + @PathParam("stageId") stageId: Int, + @QueryParam("details") @DefaultValue("true") details: Boolean): Seq[StageData] = { + withUI { ui => + val ret = ui.store.stageData(stageId, details = details) + if (ret.nonEmpty) { + ret + } else { + throw new NotFoundException(s"unknown stage: $stageId") + } + } + } + + @GET + @Path("{stageId: \\d+}/{stageAttemptId: \\d+}") + def oneAttemptData( + @PathParam("stageId") stageId: Int, + @PathParam("stageAttemptId") stageAttemptId: Int, + @QueryParam("details") @DefaultValue("true") details: Boolean): StageData = withUI { ui => + try { + ui.store.stageAttempt(stageId, stageAttemptId, details = details) + } catch { + case _: NoSuchElementException => + throw new NotFoundException(s"unknown attempt $stageAttemptId for stage $stageId.") + } + } + + @GET + @Path("{stageId: \\d+}/{stageAttemptId: \\d+}/taskSummary") + def taskSummary( + @PathParam("stageId") stageId: Int, + @PathParam("stageAttemptId") stageAttemptId: Int, + @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String) + : TaskMetricDistributions = withUI { ui => + val quantiles = quantileString.split(",").map { s => + try { + s.toDouble + } catch { + case nfe: NumberFormatException => + throw new BadParameterException("quantiles", "double", s) + } + } + + ui.store.taskSummary(stageId, stageAttemptId, quantiles) + } + + @GET + @Path("{stageId: \\d+}/{stageAttemptId: \\d+}/taskList") + def taskList( + @PathParam("stageId") stageId: Int, + @PathParam("stageAttemptId") stageAttemptId: Int, + @DefaultValue("0") @QueryParam("offset") offset: Int, + @DefaultValue("20") @QueryParam("length") length: Int, + @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { + withUI(_.store.taskList(stageId, stageAttemptId, offset, length, sortBy)) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/core/src/main/scala/org/apache/spark/status/api/v1/VersionResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/VersionResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/VersionResource.scala deleted file mode 100644 index 673da1c..0000000 --- a/core/src/main/scala/org/apache/spark/status/api/v1/VersionResource.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import javax.ws.rs._ -import javax.ws.rs.core.MediaType - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class VersionResource(ui: UIRoot) { - - @GET - def getVersionInfo(): VersionInfo = new VersionInfo( - org.apache.spark.SPARK_VERSION - ) - -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala deleted file mode 100644 index 3a51ae6..0000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import java.util.{ArrayList => JArrayList, Arrays => JArrays, Date, List => JList} -import javax.ws.rs.{GET, Produces, QueryParam} -import javax.ws.rs.core.MediaType - -import org.apache.spark.status.api.v1.streaming.AllBatchesResource._ -import org.apache.spark.streaming.ui.StreamingJobProgressListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllBatchesResource(listener: StreamingJobProgressListener) { - - @GET - def batchesList(@QueryParam("status") statusParams: JList[BatchStatus]): Seq[BatchInfo] = { - batchInfoList(listener, statusParams).sortBy(- _.batchId) - } -} - -private[v1] object AllBatchesResource { - - def batchInfoList( - listener: StreamingJobProgressListener, - statusParams: JList[BatchStatus] = new JArrayList[BatchStatus]()): Seq[BatchInfo] = { - - listener.synchronized { - val statuses = - if (statusParams.isEmpty) JArrays.asList(BatchStatus.values(): _*) else statusParams - val statusToBatches = Seq( - BatchStatus.COMPLETED -> listener.retainedCompletedBatches, - BatchStatus.QUEUED -> listener.waitingBatches, - BatchStatus.PROCESSING -> listener.runningBatches - ) - - val batchInfos = for { - (status, batches) <- statusToBatches - batch <- batches if statuses.contains(status) - } yield { - val batchId = batch.batchTime.milliseconds - val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption - - new BatchInfo( - batchId = batchId, - batchTime = new Date(batchId), - status = status.toString, - batchDuration = listener.batchDuration, - inputSize = batch.numRecords, - schedulingDelay = batch.schedulingDelay, - processingTime = batch.processingDelay, - totalDelay = batch.totalDelay, - numActiveOutputOps = batch.numActiveOutputOp, - numCompletedOutputOps = batch.numCompletedOutputOp, - numFailedOutputOps = batch.numFailedOutputOp, - numTotalOutputOps = batch.outputOperations.size, - firstFailureReason = firstFailureReason - ) - } - - batchInfos - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala deleted file mode 100644 index 0eb649f..0000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import java.util.Date -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.status.api.v1.NotFoundException -import org.apache.spark.status.api.v1.streaming.AllOutputOperationsResource._ -import org.apache.spark.streaming.Time -import org.apache.spark.streaming.ui.StreamingJobProgressListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllOutputOperationsResource(listener: StreamingJobProgressListener) { - - @GET - def operationsList(@PathParam("batchId") batchId: Long): Seq[OutputOperationInfo] = { - outputOperationInfoList(listener, batchId).sortBy(_.outputOpId) - } -} - -private[v1] object AllOutputOperationsResource { - - def outputOperationInfoList( - listener: StreamingJobProgressListener, - batchId: Long): Seq[OutputOperationInfo] = { - - listener.synchronized { - listener.getBatchUIData(Time(batchId)) match { - case Some(batch) => - for ((opId, op) <- batch.outputOperations) yield { - val jobIds = batch.outputOpIdSparkJobIdPairs - .filter(_.outputOpId == opId).map(_.sparkJobId).toSeq.sorted - - new OutputOperationInfo( - outputOpId = opId, - name = op.name, - description = op.description, - startTime = op.startTime.map(new Date(_)), - endTime = op.endTime.map(new Date(_)), - duration = op.duration, - failureReason = op.failureReason, - jobIds = jobIds - ) - } - case None => throw new NotFoundException("unknown batch: " + batchId) - } - }.toSeq - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala deleted file mode 100644 index 5a276a9..0000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import java.util.Date -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.status.api.v1.streaming.AllReceiversResource._ -import org.apache.spark.streaming.ui.StreamingJobProgressListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class AllReceiversResource(listener: StreamingJobProgressListener) { - - @GET - def receiversList(): Seq[ReceiverInfo] = { - receiverInfoList(listener).sortBy(_.streamId) - } -} - -private[v1] object AllReceiversResource { - - def receiverInfoList(listener: StreamingJobProgressListener): Seq[ReceiverInfo] = { - listener.synchronized { - listener.receivedRecordRateWithBatchTime.map { case (streamId, eventRates) => - - val receiverInfo = listener.receiverInfo(streamId) - val streamName = receiverInfo.map(_.name) - .orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId") - val avgEventRate = - if (eventRates.isEmpty) None else Some(eventRates.map(_._2).sum / eventRates.size) - - val (errorTime, errorMessage, error) = receiverInfo match { - case None => (None, None, None) - case Some(info) => - val someTime = - if (info.lastErrorTime >= 0) Some(new Date(info.lastErrorTime)) else None - val someMessage = - if (info.lastErrorMessage.length > 0) Some(info.lastErrorMessage) else None - val someError = - if (info.lastError.length > 0) Some(info.lastError) else None - - (someTime, someMessage, someError) - } - - new ReceiverInfo( - streamId = streamId, - streamName = streamName, - isActive = receiverInfo.map(_.active), - executorId = receiverInfo.map(_.executorId), - executorHost = receiverInfo.map(_.location), - lastErrorTime = errorTime, - lastErrorMessage = errorMessage, - lastError = error, - avgEventRate = avgEventRate, - eventRates = eventRates - ) - }.toSeq - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala index aea75d5..07d8164 100644 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala @@ -19,24 +19,39 @@ package org.apache.spark.status.api.v1.streaming import javax.ws.rs.{Path, PathParam} -import org.apache.spark.status.api.v1.ApiRequestContext +import org.apache.spark.status.api.v1._ +import org.apache.spark.streaming.ui.StreamingJobProgressListener @Path("/v1") private[v1] class ApiStreamingApp extends ApiRequestContext { @Path("applications/{appId}/streaming") - def getStreamingRoot(@PathParam("appId") appId: String): ApiStreamingRootResource = { - withSparkUI(appId, None) { ui => - new ApiStreamingRootResource(ui) - } + def getStreamingRoot(@PathParam("appId") appId: String): Class[ApiStreamingRootResource] = { + classOf[ApiStreamingRootResource] } @Path("applications/{appId}/{attemptId}/streaming") def getStreamingRoot( @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): ApiStreamingRootResource = { - withSparkUI(appId, Some(attemptId)) { ui => - new ApiStreamingRootResource(ui) + @PathParam("attemptId") attemptId: String): Class[ApiStreamingRootResource] = { + classOf[ApiStreamingRootResource] + } +} + +/** + * Base class for streaming API handlers, provides easy access to the streaming listener that + * holds the app's information. + */ +private[v1] trait BaseStreamingAppResource extends BaseAppResource { + + protected def withListener[T](fn: StreamingJobProgressListener => T): T = withUI { ui => + val listener = ui.getStreamingJobProgressListener match { + case Some(listener) => listener.asInstanceOf[StreamingJobProgressListener] + case None => throw new NotFoundException("no streaming listener attached to " + ui.getAppName) + } + listener.synchronized { + fn(listener) } } + } http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala index 1ccd586..a2571b9 100644 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala @@ -17,58 +17,180 @@ package org.apache.spark.status.api.v1.streaming -import javax.ws.rs.Path +import java.util.{Arrays => JArrays, Collections, Date, List => JList} +import javax.ws.rs.{GET, Path, PathParam, Produces, QueryParam} +import javax.ws.rs.core.MediaType import org.apache.spark.status.api.v1.NotFoundException +import org.apache.spark.streaming.Time import org.apache.spark.streaming.ui.StreamingJobProgressListener +import org.apache.spark.streaming.ui.StreamingJobProgressListener._ import org.apache.spark.ui.SparkUI -private[v1] class ApiStreamingRootResource(ui: SparkUI) { - - import org.apache.spark.status.api.v1.streaming.ApiStreamingRootResource._ +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class ApiStreamingRootResource extends BaseStreamingAppResource { + @GET @Path("statistics") - def getStreamingStatistics(): StreamingStatisticsResource = { - new StreamingStatisticsResource(getListener(ui)) + def streamingStatistics(): StreamingStatistics = withListener { listener => + val batches = listener.retainedBatches + val avgInputRate = avgRate(batches.map(_.numRecords * 1000.0 / listener.batchDuration)) + val avgSchedulingDelay = avgTime(batches.flatMap(_.schedulingDelay)) + val avgProcessingTime = avgTime(batches.flatMap(_.processingDelay)) + val avgTotalDelay = avgTime(batches.flatMap(_.totalDelay)) + + new StreamingStatistics( + startTime = new Date(listener.startTime), + batchDuration = listener.batchDuration, + numReceivers = listener.numReceivers, + numActiveReceivers = listener.numActiveReceivers, + numInactiveReceivers = listener.numInactiveReceivers, + numTotalCompletedBatches = listener.numTotalCompletedBatches, + numRetainedCompletedBatches = listener.retainedCompletedBatches.size, + numActiveBatches = listener.numUnprocessedBatches, + numProcessedRecords = listener.numTotalProcessedRecords, + numReceivedRecords = listener.numTotalReceivedRecords, + avgInputRate = avgInputRate, + avgSchedulingDelay = avgSchedulingDelay, + avgProcessingTime = avgProcessingTime, + avgTotalDelay = avgTotalDelay + ) } + @GET @Path("receivers") - def getReceivers(): AllReceiversResource = { - new AllReceiversResource(getListener(ui)) + def receiversList(): Seq[ReceiverInfo] = withListener { listener => + listener.receivedRecordRateWithBatchTime.map { case (streamId, eventRates) => + val receiverInfo = listener.receiverInfo(streamId) + val streamName = receiverInfo.map(_.name) + .orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId") + val avgEventRate = + if (eventRates.isEmpty) None else Some(eventRates.map(_._2).sum / eventRates.size) + + val (errorTime, errorMessage, error) = receiverInfo match { + case None => (None, None, None) + case Some(info) => + val someTime = + if (info.lastErrorTime >= 0) Some(new Date(info.lastErrorTime)) else None + val someMessage = + if (info.lastErrorMessage.length > 0) Some(info.lastErrorMessage) else None + val someError = + if (info.lastError.length > 0) Some(info.lastError) else None + + (someTime, someMessage, someError) + } + + new ReceiverInfo( + streamId = streamId, + streamName = streamName, + isActive = receiverInfo.map(_.active), + executorId = receiverInfo.map(_.executorId), + executorHost = receiverInfo.map(_.location), + lastErrorTime = errorTime, + lastErrorMessage = errorMessage, + lastError = error, + avgEventRate = avgEventRate, + eventRates = eventRates + ) + }.toSeq.sortBy(_.streamId) } + @GET @Path("receivers/{streamId: \\d+}") - def getReceiver(): OneReceiverResource = { - new OneReceiverResource(getListener(ui)) + def oneReceiver(@PathParam("streamId") streamId: Int): ReceiverInfo = { + receiversList().find { _.streamId == streamId }.getOrElse( + throw new NotFoundException("unknown receiver: " + streamId)) } + @GET @Path("batches") - def getBatches(): AllBatchesResource = { - new AllBatchesResource(getListener(ui)) + def batchesList(@QueryParam("status") statusParams: JList[BatchStatus]): Seq[BatchInfo] = { + withListener { listener => + val statuses = + if (statusParams.isEmpty) JArrays.asList(BatchStatus.values(): _*) else statusParams + val statusToBatches = Seq( + BatchStatus.COMPLETED -> listener.retainedCompletedBatches, + BatchStatus.QUEUED -> listener.waitingBatches, + BatchStatus.PROCESSING -> listener.runningBatches + ) + + val batchInfos = for { + (status, batches) <- statusToBatches + batch <- batches if statuses.contains(status) + } yield { + val batchId = batch.batchTime.milliseconds + val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption + + new BatchInfo( + batchId = batchId, + batchTime = new Date(batchId), + status = status.toString, + batchDuration = listener.batchDuration, + inputSize = batch.numRecords, + schedulingDelay = batch.schedulingDelay, + processingTime = batch.processingDelay, + totalDelay = batch.totalDelay, + numActiveOutputOps = batch.numActiveOutputOp, + numCompletedOutputOps = batch.numCompletedOutputOp, + numFailedOutputOps = batch.numFailedOutputOp, + numTotalOutputOps = batch.outputOperations.size, + firstFailureReason = firstFailureReason + ) + } + + batchInfos.sortBy(- _.batchId) + } } + @GET @Path("batches/{batchId: \\d+}") - def getBatch(): OneBatchResource = { - new OneBatchResource(getListener(ui)) + def oneBatch(@PathParam("batchId") batchId: Long): BatchInfo = { + batchesList(Collections.emptyList()).find { _.batchId == batchId }.getOrElse( + throw new NotFoundException("unknown batch: " + batchId)) } + @GET @Path("batches/{batchId: \\d+}/operations") - def getOutputOperations(): AllOutputOperationsResource = { - new AllOutputOperationsResource(getListener(ui)) + def operationsList(@PathParam("batchId") batchId: Long): Seq[OutputOperationInfo] = { + withListener { listener => + val ops = listener.getBatchUIData(Time(batchId)) match { + case Some(batch) => + for ((opId, op) <- batch.outputOperations) yield { + val jobIds = batch.outputOpIdSparkJobIdPairs + .filter(_.outputOpId == opId).map(_.sparkJobId).toSeq.sorted + + new OutputOperationInfo( + outputOpId = opId, + name = op.name, + description = op.description, + startTime = op.startTime.map(new Date(_)), + endTime = op.endTime.map(new Date(_)), + duration = op.duration, + failureReason = op.failureReason, + jobIds = jobIds + ) + } + case None => throw new NotFoundException("unknown batch: " + batchId) + } + ops.toSeq + } } + @GET @Path("batches/{batchId: \\d+}/operations/{outputOpId: \\d+}") - def getOutputOperation(): OneOutputOperationResource = { - new OneOutputOperationResource(getListener(ui)) + def oneOperation( + @PathParam("batchId") batchId: Long, + @PathParam("outputOpId") opId: OutputOpId): OutputOperationInfo = { + operationsList(batchId).find { _.outputOpId == opId }.getOrElse( + throw new NotFoundException("unknown output operation: " + opId)) } -} + private def avgRate(data: Seq[Double]): Option[Double] = { + if (data.isEmpty) None else Some(data.sum / data.size) + } -private[v1] object ApiStreamingRootResource { - def getListener(ui: SparkUI): StreamingJobProgressListener = { - ui.getStreamingJobProgressListener match { - case Some(listener) => listener.asInstanceOf[StreamingJobProgressListener] - case None => throw new NotFoundException("no streaming listener attached to " + ui.getAppName) - } + private def avgTime(data: Seq[Long]): Option[Long] = { + if (data.isEmpty) None else Some(data.sum / data.size) } + } http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala deleted file mode 100644 index d3c689c..0000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.status.api.v1.NotFoundException -import org.apache.spark.streaming.ui.StreamingJobProgressListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneBatchResource(listener: StreamingJobProgressListener) { - - @GET - def oneBatch(@PathParam("batchId") batchId: Long): BatchInfo = { - val someBatch = AllBatchesResource.batchInfoList(listener) - .find { _.batchId == batchId } - someBatch.getOrElse(throw new NotFoundException("unknown batch: " + batchId)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala deleted file mode 100644 index aabcdb2..0000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.status.api.v1.NotFoundException -import org.apache.spark.streaming.ui.StreamingJobProgressListener -import org.apache.spark.streaming.ui.StreamingJobProgressListener._ - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneOutputOperationResource(listener: StreamingJobProgressListener) { - - @GET - def oneOperation( - @PathParam("batchId") batchId: Long, - @PathParam("outputOpId") opId: OutputOpId): OutputOperationInfo = { - - val someOutputOp = AllOutputOperationsResource.outputOperationInfoList(listener, batchId) - .find { _.outputOpId == opId } - someOutputOp.getOrElse(throw new NotFoundException("unknown output operation: " + opId)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala deleted file mode 100644 index c0cc99d..0000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import javax.ws.rs.{GET, PathParam, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.status.api.v1.NotFoundException -import org.apache.spark.streaming.ui.StreamingJobProgressListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class OneReceiverResource(listener: StreamingJobProgressListener) { - - @GET - def oneReceiver(@PathParam("streamId") streamId: Int): ReceiverInfo = { - val someReceiver = AllReceiversResource.receiverInfoList(listener) - .find { _.streamId == streamId } - someReceiver.getOrElse(throw new NotFoundException("unknown receiver: " + streamId)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/39b3f10d/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala deleted file mode 100644 index 6cff87b..0000000 --- a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.api.v1.streaming - -import java.util.Date -import javax.ws.rs.{GET, Produces} -import javax.ws.rs.core.MediaType - -import org.apache.spark.streaming.ui.StreamingJobProgressListener - -@Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class StreamingStatisticsResource(listener: StreamingJobProgressListener) { - - @GET - def streamingStatistics(): StreamingStatistics = { - listener.synchronized { - val batches = listener.retainedBatches - val avgInputRate = avgRate(batches.map(_.numRecords * 1000.0 / listener.batchDuration)) - val avgSchedulingDelay = avgTime(batches.flatMap(_.schedulingDelay)) - val avgProcessingTime = avgTime(batches.flatMap(_.processingDelay)) - val avgTotalDelay = avgTime(batches.flatMap(_.totalDelay)) - - new StreamingStatistics( - startTime = new Date(listener.startTime), - batchDuration = listener.batchDuration, - numReceivers = listener.numReceivers, - numActiveReceivers = listener.numActiveReceivers, - numInactiveReceivers = listener.numInactiveReceivers, - numTotalCompletedBatches = listener.numTotalCompletedBatches, - numRetainedCompletedBatches = listener.retainedCompletedBatches.size, - numActiveBatches = listener.numUnprocessedBatches, - numProcessedRecords = listener.numTotalProcessedRecords, - numReceivedRecords = listener.numTotalReceivedRecords, - avgInputRate = avgInputRate, - avgSchedulingDelay = avgSchedulingDelay, - avgProcessingTime = avgProcessingTime, - avgTotalDelay = avgTotalDelay - ) - } - } - - private def avgRate(data: Seq[Double]): Option[Double] = { - if (data.isEmpty) None else Some(data.sum / data.size) - } - - private def avgTime(data: Seq[Long]): Option[Long] = { - if (data.isEmpty) None else Some(data.sum / data.size) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
