This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4274fb8806c [SPARK-38550][SQL][CORE] Use a disk-based store to save
more debug information for live UI
4274fb8806c is described below
commit 4274fb8806c25519cac829d00b01132fe08c0cac
Author: Linhong Liu <[email protected]>
AuthorDate: Thu Apr 14 14:01:03 2022 +0800
[SPARK-38550][SQL][CORE] Use a disk-based store to save more debug
information for live UI
### What changes were proposed in this pull request?
In Spark, the UI lacks troubleshooting abilities. For example:
* AQE plan changes are not available
* plan description of a large plan is truncated
This is because the live UI depends on an in-memory KV store. We should
always be worried
about the stability issues when adding more information to the store.
Therefore, it's better to
add a disk-based store to save more information
This PR includes:
* A disk-based KV Store in AppStatusStore that allows adding information
that does not fits in memory
* A separate listener that collects diagnostic data and saves it to the
disk store
* New Rest API endpoint to expose the diagnostics data (AQE plan changes,
untruncated plan)
### Why are the changes needed?
The troubleshooting ability is highly needed. Because without this, it's
hard to
debug AQE related issues. Once we solve the blockers, we can make a
long-term plan to improve the
observability.
### Does this PR introduce _any_ user-facing change?
Yes, a new REST API to expose more information of the application.
Rest API endpoint:
http://localhost:4040/api/v1/applications/local-1647312132944/diagnostics/sql/0
Example:
```
$ ./bin/spark-shell --conf spark.appStatusStore.diskStore.dir=/tmp/diskstore
spark-shell>
val df = sql(
"""SELECT t1.*, t2.c, t3.d
| FROM (SELECT 1 as a, 'b' as b) t1
| JOIN (SELECT 1 as a, 'c' as c) t2
| ON t1.a = t2.a
| JOIN (SELECT 1 as a, 'd' as d) t3
| ON t2.a = t3.a
|""".stripMargin)
df.show()
```
Output:
```json
{
"id" : 0,
"physicalPlan" : "<plan description string>",
"submissionTime" : "2022-03-15T03:41:42.226GMT",
"completionTime" : "2022-03-15T03:41:43.387GMT",
"errorMessage" : "",
"planChanges" : [ {
"physicalPlan" : "<plan description string>",
"updateTime" : "2022-03-15T03:41:42.268GMT"
}, {
"physicalPlan" : "<plan description string>",
"updateTime" : "2022-03-15T03:41:43.262GMT"
} ]
}
```
### How was this patch tested?
manually test
Closes #35856 from linhongliu-db/diagnostic.
Authored-by: Linhong Liu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/internal/config/Status.scala | 8 ++
.../org/apache/spark/status/AppStatusStore.scala | 27 ++++-
docs/monitoring.md | 9 ++
.../org/apache/spark/sql/internal/SQLConf.scala | 11 ++
.../spark/sql/diagnostic/DiagnosticListener.scala | 112 +++++++++++++++++++++
.../spark/sql/diagnostic/DiagnosticStore.scala | 73 ++++++++++++++
.../spark/sql/execution/QueryExecution.scala | 6 +-
.../apache/spark/sql/execution/SQLExecution.scala | 6 +-
.../spark/sql/execution/ui/SQLListener.scala | 6 +-
.../apache/spark/sql/internal/SharedState.scala | 7 ++
.../status/api/v1/sql/ApiSqlRootResource.scala | 11 ++
.../status/api/v1/sql/SQLDiagnosticResource.scala | 67 ++++++++++++
.../org/apache/spark/status/api/v1/sql/api.scala | 10 ++
13 files changed, 345 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Status.scala
b/core/src/main/scala/org/apache/spark/internal/config/Status.scala
index 669fa07053c..1db7267237f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Status.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Status.scala
@@ -70,4 +70,12 @@ private[spark] object Status {
.version("3.0.0")
.booleanConf
.createWithDefault(false)
+
+ val DISK_STORE_DIR_FOR_STATUS =
+ ConfigBuilder("spark.appStatusStore.diskStoreDir")
+ .doc("Local directory where to store app status. " +
+ "It's an alternative to the in-memory kv store")
+ .version("3.4.0")
+ .stringConf
+ .createOptional
}
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 34155e3e330..b455850d609 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -17,12 +17,17 @@
package org.apache.spark.status
+import java.io.File
+import java.nio.file.Files
import java.util.{List => JList}
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
+import scala.util.control.NonFatal
import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Status.DISK_STORE_DIR_FOR_STATUS
import org.apache.spark.status.api.v1
import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
import org.apache.spark.ui.scope._
@@ -34,6 +39,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
*/
private[spark] class AppStatusStore(
val store: KVStore,
+ val diskStore: Option[KVStore] = None,
val listener: Option[AppStatusListener] = None) {
def applicationInfo(): v1.ApplicationInfo = {
@@ -755,18 +761,33 @@ private[spark] class AppStatusStore(
}
}
-private[spark] object AppStatusStore {
+private[spark] object AppStatusStore extends Logging {
val CURRENT_VERSION = 2L
/**
- * Create an in-memory store for a live application.
+ * Create an in-memory store for a live application. also create a disk
store if
+ * the `spark.appStatusStore.diskStore.dir` is set
*/
def createLiveStore(
conf: SparkConf,
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
val store = new ElementTrackingStore(new InMemoryStore(), conf)
val listener = new AppStatusListener(store, conf, true, appStatusSource)
- new AppStatusStore(store, listener = Some(listener))
+ // create a disk-based kv store if the directory is set
+ val diskStore = conf.get(DISK_STORE_DIR_FOR_STATUS).flatMap { storeDir =>
+ val storePath = Files.createDirectories(
+ new File(storeDir, System.currentTimeMillis().toString).toPath
+ ).toFile
+ try {
+ Some(KVUtils.open(storePath, AppStatusStoreMetadata(CURRENT_VERSION),
conf))
+ .map(new ElementTrackingStore(_, conf))
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Failed to create disk-based app status store: ", e)
+ None
+ }
+ }
+ new AppStatusStore(store, diskStore = diskStore, listener = Some(listener))
}
}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index f2c6e379749..6d1bd2eefcc 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -611,6 +611,15 @@ can be identified by their `[attempt-id]`. In the API
listed below, when running
<code>?planDescription=[true (default) | false]</code> enables/disables
Physical <code>planDescription</code> on demand for the given query when
Physical Plan size is high.
</td>
</tr>
+ <tr>
+ <td><code>/applications/[app-id]/diagnostics/sql/[execution-id]</code></td>
+ <td>Diagnostic for the given query. it includes:
+ <br>
+ 1. plan change history of adaptive execution
+ <br>
+ 2. physical plan description with unlimited fields
+ </td>
+ </tr>
<tr>
<td><code>/applications/[app-id]/environment</code></td>
<td>Environment details of the given application.</td>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 69ea91f8d3e..ac2a2e350c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3198,6 +3198,15 @@ object SQLConf {
.intConf
.createWithDefault(25)
+ val MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC =
+ buildConf("spark.sql.debug.maxToStringFieldsForDiagnostic")
+ .doc(s"Similar to ${MAX_TO_STRING_FIELDS.key}, but it will take effect
when the " +
+ s"output will be stored for the diagnostics API. The output will be
stored in " +
+ s"disk instead of memory. So it can be larger than
${MAX_TO_STRING_FIELDS.key}")
+ .version("3.4.0")
+ .intConf
+ .createWithDefault(10000)
+
val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength")
.doc("Maximum number of characters to output for a plan string. If the
plan is " +
"longer, further output will be truncated. The default setting always
generates a full " +
@@ -4451,6 +4460,8 @@ class SQLConf extends Serializable with Logging {
def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)
+ def maxToStringFieldsForDiagnostic: Int =
getConf(SQLConf.MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC)
+
def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt
def maxMetadataStringLength: Int =
getConf(SQLConf.MAX_METADATA_STRING_LENGTH)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala
new file mode 100644
index 00000000000..7ce1093e879
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.diagnostic
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql.execution.ExplainMode
+import
org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate,
SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
+import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+
+/**
+ * A Spark listener that writes diagnostic information to a data store. The
information can be
+ * accessed by the public REST API.
+ *
+ * @param kvStore used to store the diagnostic information
+ */
+class DiagnosticListener(
+ conf: SparkConf,
+ kvStore: ElementTrackingStore) extends SparkListener {
+
+ kvStore.addTrigger(
+ classOf[ExecutionDiagnosticData],
+ conf.get(UI_RETAINED_EXECUTIONS)) { count =>
+ cleanupExecutions(count)
+ }
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+ case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
+ case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
+ case e: SparkListenerSQLAdaptiveExecutionUpdate =>
onAdaptiveExecutionUpdate(e)
+ case _ => // Ignore
+ }
+
+ private def onAdaptiveExecutionUpdate(event:
SparkListenerSQLAdaptiveExecutionUpdate): Unit = {
+ val data = new AdaptiveExecutionUpdate(
+ event.executionId,
+ System.currentTimeMillis(),
+ event.physicalPlanDescription
+ )
+ kvStore.write(data)
+ }
+
+ private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
+ val sqlConf = event.qe.sparkSession.sessionState.conf
+ val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode)
+ val physicalPlan = event.qe.explainString(
+ planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic)
+ val data = new ExecutionDiagnosticData(
+ event.executionId,
+ physicalPlan,
+ event.time,
+ None,
+ None
+ )
+ // Check triggers since it's adding new netries
+ kvStore.write(data, checkTriggers = true)
+ }
+
+ private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
+ try {
+ val existing = kvStore.read(classOf[ExecutionDiagnosticData],
event.executionId)
+ val sqlConf = event.qe.sparkSession.sessionState.conf
+ val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode)
+ val physicalPlan = event.qe.explainString(
+ planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic)
+ val data = new ExecutionDiagnosticData(
+ event.executionId,
+ physicalPlan,
+ existing.submissionTime,
+ Some(event.time),
+ event.executionFailure.map(
+ e => s"${e.getClass.getCanonicalName}:
${e.getMessage}").orElse(Some(""))
+ )
+ kvStore.write(data)
+ } catch {
+ case _: NoSuchElementException =>
+ // this is possibly caused by the query failed before execution.
+ }
+ }
+
+ private def cleanupExecutions(count: Long): Unit = {
+ val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS)
+ if (countToDelete <= 0) {
+ return
+ }
+ val view =
kvStore.view(classOf[ExecutionDiagnosticData]).index("completionTime").first(0L)
+ val toDelete = KVUtils.viewToSeq(view,
countToDelete.toInt)(_.completionTime.isDefined)
+ toDelete.foreach(e => kvStore.delete(classOf[ExecutionDiagnosticData],
e.executionId))
+ kvStore.removeAllByIndexValues(
+ classOf[AdaptiveExecutionUpdate], "id", toDelete.map(_.executionId))
+ }
+}
+
+object DiagnosticListener {
+ val QUEUE_NAME = "diagnostics"
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala
new file mode 100644
index 00000000000..236ee104f0e
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.diagnostic
+
+import scala.collection.JavaConverters._
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.{KVIndex, KVStore}
+
+/**
+ * Provides a view of a KVStore with methods that make it easy to query
diagnostic-specific
+ * information. There's no state kept in this class, so it's ok to have
multiple instances
+ * of it in an application.
+ */
+class DiagnosticStore(store: KVStore) {
+
+ def diagnosticsList(offset: Int, length: Int): Seq[ExecutionDiagnosticData]
= {
+
store.view(classOf[ExecutionDiagnosticData]).skip(offset).max(length).asScala.toSeq
+ }
+
+ def diagnostic(executionId: Long): Option[ExecutionDiagnosticData] = {
+ try {
+ Some(store.read(classOf[ExecutionDiagnosticData], executionId))
+ } catch {
+ case _: NoSuchElementException => None
+ }
+ }
+
+ def adaptiveExecutionUpdates(executionId: Long):
Seq[AdaptiveExecutionUpdate] = {
+ store.view(classOf[AdaptiveExecutionUpdate])
+ .index("updateTime")
+ .parent(executionId)
+ .asScala
+ .toSeq
+ }
+}
+
+/* Represents the diagnostic data of a SQL execution */
+class ExecutionDiagnosticData(
+ @KVIndexParam val executionId: Long,
+ val physicalPlan: String,
+ val submissionTime: Long,
+ val completionTime: Option[Long],
+ val errorMessage: Option[String])
+
+/* Represents the plan change of an adaptive execution */
+class AdaptiveExecutionUpdate(
+ @KVIndexParam("id")
+ val executionId: Long,
+ @KVIndexParam(value = "updateTime", parent = "id")
+ val updateTime: Long,
+ val physicalPlan: String) {
+
+ @JsonIgnore @KVIndex
+ private def naturalIndex: Array[Long] = Array(executionId, updateTime)
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 9bf8de5ea6c..4b74a96702c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -223,9 +223,11 @@ class QueryExecution(
append("\n")
}
- def explainString(mode: ExplainMode): String = {
+ def explainString(
+ mode: ExplainMode,
+ maxFields: Int = SQLConf.get.maxToStringFields): String = {
val concat = new PlanStringConcat()
- explainString(mode, SQLConf.get.maxToStringFields, concat.append)
+ explainString(mode, maxFields, concat.append)
withRedaction {
concat.toString
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 748f75b1862..953c370297f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -96,7 +96,7 @@ object SQLExecution {
var ex: Option[Throwable] = None
val startTime = System.nanoTime()
try {
- sc.listenerBus.post(SparkListenerSQLExecutionStart(
+ val event = SparkListenerSQLExecutionStart(
executionId = executionId,
description = desc,
details = callSite.longForm,
@@ -105,7 +105,9 @@ object SQLExecution {
// will be caught and reported in the
`SparkListenerSQLExecutionEnd`
sparkPlanInfo =
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
time = System.currentTimeMillis(),
- redactedConfigs))
+ redactedConfigs)
+ event.qe = queryExecution
+ sc.listenerBus.post(event)
body
} catch {
case e: Throwable =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 26805e135b7..e3f51cbe3b0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -49,7 +49,11 @@ case class SparkListenerSQLExecutionStart(
sparkPlanInfo: SparkPlanInfo,
time: Long,
modifiedConfigs: Map[String, String] = Map.empty)
- extends SparkListenerEvent
+ extends SparkListenerEvent {
+
+ // The `QueryExecution` instance that represents the SQL execution
+ @JsonIgnore private[sql] var qe: QueryExecution = null
+}
@DeveloperApi
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index e894f39d927..f6b748d2424 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.diagnostic.DiagnosticListener
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.streaming.StreamExecution
@@ -118,6 +119,12 @@ private[sql] class SharedState(
statusStore
}
+ sparkContext.statusStore.diskStore.foreach { kvStore =>
+ sparkContext.listenerBus.addToQueue(
+ new DiagnosticListener(conf, kvStore.asInstanceOf[ElementTrackingStore]),
+ DiagnosticListener.QUEUE_NAME)
+ }
+
/**
* A [[StreamingQueryListener]] for structured streaming ui, it contains all
streaming query ui
* data to show.
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
index 747c05b9b06..6c727c4369d 100644
---
a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
@@ -31,4 +31,15 @@ private[v1] class ApiSqlRootResource extends
ApiRequestContext {
def sqlList(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): Class[SqlResource] =
classOf[SqlResource]
+
+ @Path("applications/{appId}/diagnostics/sql")
+ def sqlDiagnosticsList(
+ @PathParam("appId") appId: String): Class[SQLDiagnosticResource] =
+ classOf[SQLDiagnosticResource]
+
+ @Path("applications/{appId}/{attemptId}/diagnostics/sql")
+ def sqlDiagnosticsList(
+ @PathParam("appId") appId: String,
+ @PathParam("attemptId") attemptId: String): Class[SQLDiagnosticResource]
=
+ classOf[SQLDiagnosticResource]
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala
new file mode 100644
index 00000000000..8a6c81ced74
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.sql
+
+import java.util.Date
+import javax.ws.rs._
+
+import org.apache.spark.sql.diagnostic._
+import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException}
+
+private[v1] class SQLDiagnosticResource extends BaseAppResource {
+
+ @GET
+ def sqlDiagnosticList(
+ @DefaultValue("0") @QueryParam("offset") offset: Int,
+ @DefaultValue("20") @QueryParam("length") length: Int):
Seq[SQLDiagnosticData] = {
+ withUI { ui =>
+ ui.store.diskStore.map { kvStore =>
+ val store = new DiagnosticStore(kvStore)
+ store.diagnosticsList(offset, length)
+ // Do not display the plan changes in the list
+ .map(d => prepareSqlDiagnosticData(d, Seq.empty))
+ }.getOrElse(Seq.empty)
+ }
+ }
+
+ @GET
+ @Path("{executionId:\\d+}")
+ def sqlDiagnostic(
+ @PathParam("executionId") execId: Long): SQLDiagnosticData = {
+ withUI { ui =>
+ ui.store.diskStore.flatMap { kvStore =>
+ val store = new DiagnosticStore(kvStore)
+ val updates = store.adaptiveExecutionUpdates(execId)
+ store.diagnostic(execId)
+ .map(d => prepareSqlDiagnosticData(d, updates))
+ }.getOrElse(throw new NotFoundException("unknown query execution id: " +
execId))
+ }
+ }
+
+ private def prepareSqlDiagnosticData(
+ diagnostic: ExecutionDiagnosticData,
+ updates: Seq[AdaptiveExecutionUpdate]): SQLDiagnosticData = {
+ new SQLDiagnosticData(
+ diagnostic.executionId,
+ diagnostic.physicalPlan,
+ new Date(diagnostic.submissionTime),
+ diagnostic.completionTime.map(t => new Date(t)),
+ diagnostic.errorMessage,
+ updates.map(u => AdaptivePlanChange(new Date(u.updateTime),
u.physicalPlan)))
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
index 0ddf66718bc..3cafc10352f 100644
--- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
+++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
@@ -40,3 +40,13 @@ case class Node private[spark](
metrics: Seq[Metric])
case class Metric private[spark] (name: String, value: String)
+
+class SQLDiagnosticData private[spark] (
+ val id: Long,
+ val physicalPlan: String,
+ val submissionTime: Date,
+ val completionTime: Option[Date],
+ val errorMessage: Option[String],
+ val planChanges: Seq[AdaptivePlanChange])
+
+case class AdaptivePlanChange(updateTime: Date, physicalPlan: String)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]