Repository: spark
Updated Branches:
  refs/heads/master 97a466eca -> 4f035dd2c


[SPARK-611] Display executor thread dumps in web UI

This patch allows executor thread dumps to be collected on-demand and viewed in 
the Spark web UI.

The thread dumps are collected using Thread.getAllStackTraces().  To allow 
remote thread dumps to be triggered from the web UI, I added a new 
`ExecutorActor` that runs inside of the Executor actor system and responds to 
RPCs from the driver.  The driver's mechanism for obtaining a reference to this 
actor is a little bit hacky: it uses the block manager master actor to 
determine the host/port of the executor actor systems in order to construct 
ActorRefs to ExecutorActor.  Unfortunately, I couldn't find a much cleaner way 
to do this without a big refactoring of the executor -> driver communication.

Screenshots:

![image](https://cloud.githubusercontent.com/assets/50748/4781793/7e7a0776-5cbf-11e4-874d-a91cd04620bd.png)

![image](https://cloud.githubusercontent.com/assets/50748/4781794/8bce76aa-5cbf-11e4-8d13-8477748c9f7e.png)

![image](https://cloud.githubusercontent.com/assets/50748/4781797/bd11a8b8-5cbf-11e4-9ad7-a7459467ec8e.png)

Author: Josh Rosen <joshro...@databricks.com>

Closes #2944 from JoshRosen/jstack-in-web-ui and squashes the following commits:

3c21a5d [Josh Rosen] Address review comments:
880f7f7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into 
jstack-in-web-ui
f719266 [Josh Rosen] Merge remote-tracking branch 'origin/master' into 
jstack-in-web-ui
19707b0 [Josh Rosen] Add one comment.
127a130 [Josh Rosen] Update to use SparkContext.DRIVER_IDENTIFIER
b8e69aa [Josh Rosen] Merge remote-tracking branch 'origin/master' into 
jstack-in-web-ui
3dfc2d4 [Josh Rosen] Add missing file.
bc1e675 [Josh Rosen] Undo some leftover changes from the earlier approach.
f4ac1c1 [Josh Rosen] Switch to on-demand collection of thread dumps
dfec08b [Josh Rosen] Add option to disable thread dumps in UI.
4c87d7f [Josh Rosen] Use separate RPC for sending thread dumps.
2b8bdf3 [Josh Rosen] Enable thread dumps from the driver when running in 
non-local mode.
cc3e6b3 [Josh Rosen] Fix test code in DAGSchedulerSuite.
87b8b65 [Josh Rosen] Add new listener event for thread dumps.
8c10216 [Josh Rosen] Add missing file.
0f198ac [Josh Rosen] [SPARK-611] Display executor thread dumps in web UI


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f035dd2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f035dd2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f035dd2

Branch: refs/heads/master
Commit: 4f035dd2cd6f1ec9059811f3495f3e0a8ec5fb84
Parents: 97a466e
Author: Josh Rosen <joshro...@databricks.com>
Authored: Mon Nov 3 18:18:47 2014 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Mon Nov 3 18:18:47 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 29 +++++++-
 .../executor/CoarseGrainedExecutorBackend.scala |  3 +-
 .../org/apache/spark/executor/Executor.scala    |  7 +-
 .../apache/spark/executor/ExecutorActor.scala   | 41 +++++++++++
 .../spark/storage/BlockManagerMaster.scala      |  4 ++
 .../spark/storage/BlockManagerMasterActor.scala | 18 +++++
 .../spark/storage/BlockManagerMessages.scala    |  2 +
 .../spark/ui/exec/ExecutorThreadDumpPage.scala  | 73 ++++++++++++++++++++
 .../apache/spark/ui/exec/ExecutorsPage.scala    | 15 +++-
 .../org/apache/spark/ui/exec/ExecutorsTab.scala |  8 ++-
 .../scala/org/apache/spark/util/AkkaUtils.scala | 14 ++++
 .../apache/spark/util/ThreadStackTrace.scala    | 27 ++++++++
 .../scala/org/apache/spark/util/Utils.scala     | 13 ++++
 13 files changed, 247 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8b4db78..40444c2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -21,9 +21,8 @@ import scala.language.implicitConversions
 
 import java.io._
 import java.net.URI
-import java.util.Arrays
+import java.util.{Arrays, Properties, UUID}
 import java.util.concurrent.atomic.AtomicInteger
-import java.util.{Properties, UUID}
 import java.util.UUID.randomUUID
 import scala.collection.{Map, Set}
 import scala.collection.generic.Growable
@@ -41,6 +40,7 @@ import akka.actor.Props
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
+import org.apache.spark.executor.TriggerThreadDump
 import org.apache.spark.input.{StreamInputFormat, PortableDataStream, 
WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
@@ -51,7 +51,7 @@ import org.apache.spark.scheduler.local.LocalBackend
 import org.apache.spark.storage._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.ui.jobs.JobProgressListener
-import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, 
MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
+import org.apache.spark.util._
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the 
connection to a Spark
@@ -361,6 +361,29 @@ class SparkContext(config: SparkConf) extends 
SparkStatusAPI with Logging {
     override protected def childValue(parent: Properties): Properties = new 
Properties(parent)
   }
 
+  /**
+   * Called by the web UI to obtain executor thread dumps.  This method may be 
expensive.
+   * Logs an error and returns None if we failed to obtain a thread dump, 
which could occur due
+   * to an executor being dead or unresponsive or due to network issues while 
sending the thread
+   * dump message back to the driver.
+   */
+  private[spark] def getExecutorThreadDump(executorId: String): 
Option[Array[ThreadStackTrace]] = {
+    try {
+      if (executorId == SparkContext.DRIVER_IDENTIFIER) {
+        Some(Utils.getThreadDump())
+      } else {
+        val (host, port) = 
env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get
+        val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, 
port, env.actorSystem)
+        
Some(AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump, 
actorRef,
+          AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), 
AkkaUtils.askTimeout(conf)))
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Exception getting thread dump from executor $executorId", e)
+        None
+    }
+  }
+
   private[spark] def getLocalProperties: Properties = localProperties.get()
 
   private[spark] def setLocalProperties(props: Properties) {

http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 697154d..3711824 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -131,7 +131,8 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       // Create a new ActorSystem using driver's Spark properties to run the 
backend.
       val driverConf = new SparkConf().setAll(props)
       val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
-        "sparkExecutor", hostname, port, driverConf, new 
SecurityManager(driverConf))
+        SparkEnv.executorActorSystemName,
+        hostname, port, driverConf, new SecurityManager(driverConf))
       // set it
       val sparkHostPort = hostname + ":" + boundPort
       actorSystem.actorOf(

http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index e24a15f..8b095e2 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.util.control.NonFatal
 
-import akka.actor.ActorSystem
+import akka.actor.{Props, ActorSystem}
 
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -92,6 +92,10 @@ private[spark] class Executor(
     }
   }
 
+  // Create an actor for receiving RPCs from the driver
+  private val executorActor = env.actorSystem.actorOf(
+    Props(new ExecutorActor(executorId)), "ExecutorActor")
+
   // Create our ClassLoader
   // do this after SparkEnv creation so can access the SecurityManager
   private val urlClassLoader = createClassLoader()
@@ -131,6 +135,7 @@ private[spark] class Executor(
 
   def stop() {
     env.metricsSystem.report()
+    env.actorSystem.stop(executorActor)
     isStopped = true
     threadPool.shutdown()
     if (!isLocal) {

http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala 
b/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
new file mode 100644
index 0000000..41925f7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import akka.actor.Actor
+import org.apache.spark.Logging
+
+import org.apache.spark.util.{Utils, ActorLogReceive}
+
+/**
+ * Driver -> Executor message to trigger a thread dump.
+ */
+private[spark] case object TriggerThreadDump
+
+/**
+ * Actor that runs inside of executors to enable driver -> executor RPC.
+ */
+private[spark]
+class ExecutorActor(executorId: String) extends Actor with ActorLogReceive 
with Logging {
+
+  override def receiveWithLogging = {
+    case TriggerThreadDump =>
+      sender ! Utils.getThreadDump()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index d08e141..b63c7f1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -88,6 +88,10 @@ class BlockManagerMaster(
     askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
   }
 
+  def getActorSystemHostPortForExecutor(executorId: String): Option[(String, 
Int)] = {
+    askDriverWithReply[Option[(String, 
Int)]](GetActorSystemHostPortForExecutor(executorId))
+  }
+
   /**
    * Remove a block from the slaves that have it. This can only be used to 
remove
    * blocks that the driver knows about.

http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 5e375a2..685b2e1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -86,6 +86,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
     case GetPeers(blockManagerId) =>
       sender ! getPeers(blockManagerId)
 
+    case GetActorSystemHostPortForExecutor(executorId) =>
+      sender ! getActorSystemHostPortForExecutor(executorId)
+
     case GetMemoryStatus =>
       sender ! memoryStatus
 
@@ -412,6 +415,21 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: 
SparkConf, listenerBus
       Seq.empty
     }
   }
+
+  /**
+   * Returns the hostname and port of an executor's actor system, based on the 
Akka address of its
+   * BlockManagerSlaveActor.
+   */
+  private def getActorSystemHostPortForExecutor(executorId: String): 
Option[(String, Int)] = {
+    for (
+      blockManagerId <- blockManagerIdByExecutor.get(executorId);
+      info <- blockManagerInfo.get(blockManagerId);
+      host <- info.slaveActor.path.address.host;
+      port <- info.slaveActor.path.address.port
+    ) yield {
+      (host, port)
+    }
+  }
 }
 
 @DeveloperApi

http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 291ddfc..3f32099 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -92,6 +92,8 @@ private[spark] object BlockManagerMessages {
 
   case class GetPeers(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster
 
+  case class GetActorSystemHostPortForExecutor(executorId: String) extends 
ToBlockManagerMaster
+
   case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
 
   case object StopBlockManagerMaster extends ToBlockManagerMaster

http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala 
b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
new file mode 100644
index 0000000..e9c755e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.util.Try
+import scala.xml.{Text, Node}
+
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends 
WebUIPage("threadDump") {
+
+  private val sc = parent.sc
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val executorId = Option(request.getParameter("executorId")).getOrElse {
+      return Text(s"Missing executorId parameter")
+    }
+    val time = System.currentTimeMillis()
+    val maybeThreadDump = sc.get.getExecutorThreadDump(executorId)
+
+    val content = maybeThreadDump.map { threadDump =>
+      val dumpRows = threadDump.map { thread =>
+        <div class="accordion-group">
+          <div class="accordion-heading" 
onclick="$(this).next().toggleClass('hidden')">
+            <a class="accordion-toggle">
+              Thread {thread.threadId}: {thread.threadName} 
({thread.threadState})
+            </a>
+          </div>
+          <div class="accordion-body hidden">
+            <div class="accordion-inner">
+              <pre>{thread.stackTrace}</pre>
+            </div>
+          </div>
+        </div>
+      }
+
+      <div class="row-fluid">
+        <p>Updated at {UIUtils.formatDate(time)}</p>
+        {
+          // scalastyle:off
+          <p><a class="expandbutton"
+                onClick="$('.accordion-body').removeClass('hidden'); 
$('.expandbutton').toggleClass('hidden')">
+            Expand All
+          </a></p>
+          <p><a class="expandbutton hidden"
+                onClick="$('.accordion-body').addClass('hidden'); 
$('.expandbutton').toggleClass('hidden')">
+            Collapse All
+          </a></p>
+          // scalastyle:on
+        }
+        <div class="accordion">{dumpRows}</div>
+      </div>
+    }.getOrElse(Text("Error fetching thread dump"))
+    UIUtils.headerSparkPage(s"Thread dump for executor $executorId", content, 
parent)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala 
b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index b0e3bb3..048fee3 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -41,7 +41,10 @@ private case class ExecutorSummaryInfo(
     totalShuffleWrite: Long,
     maxMemory: Long)
 
-private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
+private[ui] class ExecutorsPage(
+    parent: ExecutorsTab,
+    threadDumpEnabled: Boolean)
+  extends WebUIPage("") {
   private val listener = parent.listener
 
   def render(request: HttpServletRequest): Seq[Node] = {
@@ -75,6 +78,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends 
WebUIPage("") {
               Shuffle Write
             </span>
           </th>
+          {if (threadDumpEnabled) <th class="sorttable_nosort">Thread 
Dump</th> else Seq.empty}
         </thead>
         <tbody>
           {execInfoSorted.map(execRow)}
@@ -133,6 +137,15 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) 
extends WebUIPage("") {
       <td sorttable_customkey={info.totalShuffleWrite.toString}>
         {Utils.bytesToString(info.totalShuffleWrite)}
       </td>
+      {
+        if (threadDumpEnabled) {
+          <td>
+            <a href={s"threadDump/?executorId=${info.id}"}>Thread Dump</a>
+          </td>
+        } else {
+          Seq.empty
+        }
+      }
     </tr>
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala 
b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 9e0e71a..ba97630 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -27,8 +27,14 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
 
 private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, 
"executors") {
   val listener = parent.executorsListener
+  val sc = parent.sc
+  val threadDumpEnabled =
+    sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true)
 
-  attachPage(new ExecutorsPage(this))
+  attachPage(new ExecutorsPage(this, threadDumpEnabled))
+  if (threadDumpEnabled) {
+    attachPage(new ExecutorThreadDumpPage(this))
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala 
b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 79e398e..10010bd 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -212,4 +212,18 @@ private[spark] object AkkaUtils extends Logging {
     logInfo(s"Connecting to $name: $url")
     Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
   }
+
+  def makeExecutorRef(
+      name: String,
+      conf: SparkConf,
+      host: String,
+      port: Int,
+      actorSystem: ActorSystem): ActorRef = {
+    val executorActorSystemName = SparkEnv.executorActorSystemName
+    Utils.checkHost(host, "Expected hostname")
+    val url = s"akka.tcp://$executorActorSystemName@$host:$port/user/$name"
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    logInfo(s"Connecting to $name: $url")
+    Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala
new file mode 100644
index 0000000..d4e0ad9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * Used for shipping per-thread stacktraces from the executors to driver.
+ */
+private[spark] case class ThreadStackTrace(
+  threadId: Long,
+  threadName: String,
+  threadState: Thread.State,
+  stackTrace: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/4f035dd2/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a33046d..6ab94af 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.util
 
 import java.io._
+import java.lang.management.ManagementFactory
 import java.net._
 import java.nio.ByteBuffer
 import java.util.jar.Attributes.Name
@@ -1611,6 +1612,18 @@ private[spark] object Utils extends Logging {
     s"$className: $desc\n$st"
   }
 
+  /** Return a thread dump of all threads' stacktraces.  Used to capture dumps 
for the web UI */
+  def getThreadDump(): Array[ThreadStackTrace] = {
+    // We need to filter out null values here because dumpAllThreads() may 
return null array
+    // elements for threads that are dead / don't exist.
+    val threadInfos = ManagementFactory.getThreadMXBean.dumpAllThreads(true, 
true).filter(_ != null)
+    threadInfos.sortBy(_.getThreadId).map { case threadInfo =>
+      val stackTrace = threadInfo.getStackTrace.map(_.toString).mkString("\n")
+      ThreadStackTrace(threadInfo.getThreadId, threadInfo.getThreadName,
+        threadInfo.getThreadState, stackTrace)
+    }
+  }
+
   /**
    * Convert all spark properties set in the given SparkConf to a sequence of 
java options.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to