Repository: spark
Updated Branches:
  refs/heads/branch-1.6 c859be2dd -> 42d933fbb


[SPARK-11112] DAG visualization: display RDD callsite

<img width="548" alt="screen shot 2015-11-01 at 9 42 33 am" 
src="https://cloud.githubusercontent.com/assets/2133137/10870343/2a8cd070-807d-11e5-857a-4ebcace77b5b.png";>
mateiz sarutak

Author: Andrew Or <and...@databricks.com>

Closes #9398 from andrewor14/rdd-callsite.

(cherry picked from commit 7f741905b06ed6d3dfbff6db41a3355dab71aa3c)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42d933fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42d933fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42d933fb

Branch: refs/heads/branch-1.6
Commit: 42d933fbba0584b39bd8218eafc44fb03aeb157d
Parents: c859be2
Author: Andrew Or <and...@databricks.com>
Authored: Sat Nov 7 05:35:53 2015 +0100
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Nov 9 09:59:20 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/ui/static/spark-dag-viz.css    |  4 +++
 .../org/apache/spark/storage/RDDInfo.scala      | 16 +++++++--
 .../spark/ui/scope/RDDOperationGraph.scala      | 10 +++---
 .../org/apache/spark/util/JsonProtocol.scala    | 17 ++++++++-
 .../scala/org/apache/spark/util/Utils.scala     |  1 +
 .../org/apache/spark/ui/UISeleniumSuite.scala   | 14 ++++----
 .../apache/spark/util/JsonProtocolSuite.scala   | 37 ++++++++++++++++----
 7 files changed, 79 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
----------------------------------------------------------------------
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css 
b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
index 3b4ae2e..9cc5c79 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
@@ -122,3 +122,7 @@
   stroke: #52C366;
   stroke-width: 2px;
 }
+
+.tooltip-inner {
+  white-space: pre-wrap;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala 
b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 9606262..3fa209b 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.{RDDOperationScope, RDD}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CallSite, Utils}
 
 @DeveloperApi
 class RDDInfo(
@@ -28,9 +28,20 @@ class RDDInfo(
     val numPartitions: Int,
     var storageLevel: StorageLevel,
     val parentIds: Seq[Int],
+    val callSite: CallSite,
     val scope: Option[RDDOperationScope] = None)
   extends Ordered[RDDInfo] {
 
+  def this(
+      id: Int,
+      name: String,
+      numPartitions: Int,
+      storageLevel: StorageLevel,
+      parentIds: Seq[Int],
+      scope: Option[RDDOperationScope] = None) {
+    this(id, name, numPartitions, storageLevel, parentIds, CallSite.empty, 
scope)
+  }
+
   var numCachedPartitions = 0
   var memSize = 0L
   var diskSize = 0L
@@ -56,6 +67,7 @@ private[spark] object RDDInfo {
   def fromRdd(rdd: RDD[_]): RDDInfo = {
     val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))
     val parentIds = rdd.dependencies.map(_.rdd.id)
-    new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, 
parentIds, rdd.scope)
+    new RDDInfo(rdd.id, rddName, rdd.partitions.length,
+      rdd.getStorageLevel, parentIds, rdd.creationSite, rdd.scope)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala 
b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 81f168a..2427456 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.{StringBuilder, ListBuffer}
 import org.apache.spark.Logging
 import org.apache.spark.scheduler.StageInfo
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.CallSite
 
 /**
  * A representation of a generic cluster graph used for storing information on 
RDD operations.
@@ -38,7 +39,7 @@ private[ui] case class RDDOperationGraph(
     rootCluster: RDDOperationCluster)
 
 /** A node in an RDDOperationGraph. This represents an RDD. */
-private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean)
+private[ui] case class RDDOperationNode(id: Int, name: String, cached: 
Boolean, callsite: CallSite)
 
 /**
  * A directed edge connecting two nodes in an RDDOperationGraph.
@@ -104,8 +105,8 @@ private[ui] object RDDOperationGraph extends Logging {
       edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, 
rdd.id) }
 
       // TODO: differentiate between the intention to cache an RDD and whether 
it's actually cached
-      val node = nodes.getOrElseUpdate(
-        rdd.id, RDDOperationNode(rdd.id, rdd.name, rdd.storageLevel != 
StorageLevel.NONE))
+      val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(
+        rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite))
 
       if (rdd.scope.isEmpty) {
         // This RDD has no encompassing scope, so we put it directly in the 
root cluster
@@ -177,7 +178,8 @@ private[ui] object RDDOperationGraph extends Logging {
 
   /** Return the dot representation of a node in an RDDOperationGraph. */
   private def makeDotNode(node: RDDOperationNode): String = {
-    s"""${node.id} [label="${node.name} [${node.id}]"]"""
+    val label = s"${node.name} [${node.id}]\n${node.callsite.shortForm}"
+    s"""${node.id} [label="$label"]"""
   }
 
   /** Update the dot representation of the RDDOperationGraph in cluster to 
subgraph. */

http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index ee2eb58..c9beeb2 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -398,6 +398,7 @@ private[spark] object JsonProtocol {
     ("RDD ID" -> rddInfo.id) ~
     ("Name" -> rddInfo.name) ~
     ("Scope" -> rddInfo.scope.map(_.toJson)) ~
+    ("Callsite" -> callsiteToJson(rddInfo.callSite)) ~
     ("Parent IDs" -> parentIds) ~
     ("Storage Level" -> storageLevel) ~
     ("Number of Partitions" -> rddInfo.numPartitions) ~
@@ -407,6 +408,11 @@ private[spark] object JsonProtocol {
     ("Disk Size" -> rddInfo.diskSize)
   }
 
+  def callsiteToJson(callsite: CallSite): JValue = {
+    ("Short Form" -> callsite.shortForm) ~
+    ("Long Form" -> callsite.longForm)
+  }
+
   def storageLevelToJson(storageLevel: StorageLevel): JValue = {
     ("Use Disk" -> storageLevel.useDisk) ~
     ("Use Memory" -> storageLevel.useMemory) ~
@@ -851,6 +857,9 @@ private[spark] object JsonProtocol {
     val scope = Utils.jsonOption(json \ "Scope")
       .map(_.extract[String])
       .map(RDDOperationScope.fromJson)
+    val callsite = Utils.jsonOption(json \ "Callsite")
+      .map(callsiteFromJson)
+      .getOrElse(CallSite.empty)
     val parentIds = Utils.jsonOption(json \ "Parent IDs")
       .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
       .getOrElse(Seq.empty)
@@ -863,7 +872,7 @@ private[spark] object JsonProtocol {
       .getOrElse(json \ "Tachyon Size").extract[Long]
     val diskSize = (json \ "Disk Size").extract[Long]
 
-    val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, 
parentIds, scope)
+    val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, 
parentIds, callsite, scope)
     rddInfo.numCachedPartitions = numCachedPartitions
     rddInfo.memSize = memSize
     rddInfo.externalBlockStoreSize = externalBlockStoreSize
@@ -871,6 +880,12 @@ private[spark] object JsonProtocol {
     rddInfo
   }
 
+  def callsiteFromJson(json: JValue): CallSite = {
+    val shortForm = (json \ "Short Form").extract[String]
+    val longForm = (json \ "Long Form").extract[String]
+    CallSite(shortForm, longForm)
+  }
+
   def storageLevelFromJson(json: JValue): StorageLevel = {
     val useDisk = (json \ "Use Disk").extract[Boolean]
     val useMemory = (json \ "Use Memory").extract[Boolean]

http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5a976ee..316c194 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -57,6 +57,7 @@ private[spark] case class CallSite(shortForm: String, 
longForm: String)
 private[spark] object CallSite {
   val SHORT_FORM = "callSite.short"
   val LONG_FORM = "callSite.long"
+  val empty = CallSite("", "")
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 18eec7d..ceecfd6 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -615,29 +615,29 @@ class UISeleniumSuite extends SparkFunSuite with 
WebBrowser with Matchers with B
       assert(stage0.contains("digraph G {\n  subgraph clusterstage_0 {\n    " +
         "label=&quot;Stage 0&quot;;\n    subgraph "))
       assert(stage0.contains("{\n      label=&quot;parallelize&quot;;\n      " 
+
-        "0 [label=&quot;ParallelCollectionRDD [0]&quot;];\n    }"))
+        "0 [label=&quot;ParallelCollectionRDD [0]"))
       assert(stage0.contains("{\n      label=&quot;map&quot;;\n      " +
-        "1 [label=&quot;MapPartitionsRDD [1]&quot;];\n    }"))
+        "1 [label=&quot;MapPartitionsRDD [1]"))
       assert(stage0.contains("{\n      label=&quot;groupBy&quot;;\n      " +
-        "2 [label=&quot;MapPartitionsRDD [2]&quot;];\n    }"))
+        "2 [label=&quot;MapPartitionsRDD [2]"))
 
       val stage1 = Source.fromURL(sc.ui.get.appUIAddress +
         "/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString
       assert(stage1.contains("digraph G {\n  subgraph clusterstage_1 {\n    " +
         "label=&quot;Stage 1&quot;;\n    subgraph "))
       assert(stage1.contains("{\n      label=&quot;groupBy&quot;;\n      " +
-        "3 [label=&quot;ShuffledRDD [3]&quot;];\n    }"))
+        "3 [label=&quot;ShuffledRDD [3]"))
       assert(stage1.contains("{\n      label=&quot;map&quot;;\n      " +
-        "4 [label=&quot;MapPartitionsRDD [4]&quot;];\n    }"))
+        "4 [label=&quot;MapPartitionsRDD [4]"))
       assert(stage1.contains("{\n      label=&quot;groupBy&quot;;\n      " +
-        "5 [label=&quot;MapPartitionsRDD [5]&quot;];\n    }"))
+        "5 [label=&quot;MapPartitionsRDD [5]"))
 
       val stage2 = Source.fromURL(sc.ui.get.appUIAddress +
         "/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString
       assert(stage2.contains("digraph G {\n  subgraph clusterstage_2 {\n    " +
         "label=&quot;Stage 2&quot;;\n    subgraph "))
       assert(stage2.contains("{\n      label=&quot;groupBy&quot;;\n      " +
-        "6 [label=&quot;ShuffledRDD [6]&quot;];\n    }"))
+        "6 [label=&quot;ShuffledRDD [6]"))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 953456c..3f94ef7 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -111,6 +111,7 @@ class JsonProtocolSuite extends SparkFunSuite {
   test("Dependent Classes") {
     val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
     testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
+    testCallsite(CallSite("happy", "birthday"))
     testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
     testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
     testTaskMetrics(makeTaskMetrics(
@@ -163,6 +164,10 @@ class JsonProtocolSuite extends SparkFunSuite {
     testBlockId(StreamBlockId(1, 2L))
   }
 
+  /* ============================== *
+   |  Backward compatibility tests  |
+   * ============================== */
+
   test("ExceptionFailure backward compatibility") {
     val exceptionFailure = ExceptionFailure("To be", "or not to be", 
stackTrace, null,
       None, None)
@@ -334,14 +339,17 @@ class JsonProtocolSuite extends SparkFunSuite {
     assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
   }
 
-  test("RDDInfo backward compatibility (scope, parent IDs)") {
-    // Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" 
properties
-    val rddInfo = new RDDInfo(
-      1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), Some(new 
RDDOperationScope("fable")))
+  test("RDDInfo backward compatibility (scope, parent IDs, callsite)") {
+    // "Scope" and "Parent IDs" were introduced in Spark 1.4.0
+    // "Callsite" was introduced in Spark 1.6.0
+    val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8),
+      CallSite("short", "long"), Some(new RDDOperationScope("fable")))
     val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo)
       .removeField({ _._1 == "Parent IDs"})
       .removeField({ _._1 == "Scope"})
-    val expectedRddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, 
Seq.empty, scope = None)
+      .removeField({ _._1 == "Callsite"})
+    val expectedRddInfo = new RDDInfo(
+      1, "one", 100, StorageLevel.NONE, Seq.empty, CallSite.empty, scope = 
None)
     assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson))
   }
 
@@ -389,6 +397,11 @@ class JsonProtocolSuite extends SparkFunSuite {
     assertEquals(info, newInfo)
   }
 
+  private def testCallsite(callsite: CallSite): Unit = {
+    val newCallsite = 
JsonProtocol.callsiteFromJson(JsonProtocol.callsiteToJson(callsite))
+    assert(callsite === newCallsite)
+  }
+
   private def testStageInfo(info: StageInfo) {
     val newInfo = 
JsonProtocol.stageInfoFromJson(JsonProtocol.stageInfoToJson(info))
     assertEquals(info, newInfo)
@@ -713,7 +726,8 @@ class JsonProtocolSuite extends SparkFunSuite {
   }
 
   private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
-    val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, Seq(1, 4, 
7))
+    val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK,
+      Seq(1, 4, 7), CallSite(a.toString, b.toString))
     r.numCachedPartitions = c
     r.memSize = d
     r.diskSize = e
@@ -856,6 +870,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       |      {
       |        "RDD ID": 101,
       |        "Name": "mayor",
+      |        "Callsite": {"Short Form": "101", "Long Form": "201"},
       |        "Parent IDs": [1, 4, 7],
       |        "Storage Level": {
       |          "Use Disk": true,
@@ -1258,6 +1273,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        {
       |          "RDD ID": 1,
       |          "Name": "mayor",
+      |          "Callsite": {"Short Form": "1", "Long Form": "200"},
       |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
@@ -1301,6 +1317,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        {
       |          "RDD ID": 2,
       |          "Name": "mayor",
+      |          "Callsite": {"Short Form": "2", "Long Form": "400"},
       |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
@@ -1318,6 +1335,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        {
       |          "RDD ID": 3,
       |          "Name": "mayor",
+      |          "Callsite": {"Short Form": "3", "Long Form": "401"},
       |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
@@ -1361,6 +1379,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        {
       |          "RDD ID": 3,
       |          "Name": "mayor",
+      |          "Callsite": {"Short Form": "3", "Long Form": "600"},
       |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
@@ -1378,6 +1397,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        {
       |          "RDD ID": 4,
       |          "Name": "mayor",
+      |          "Callsite": {"Short Form": "4", "Long Form": "601"},
       |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
@@ -1395,6 +1415,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        {
       |          "RDD ID": 5,
       |          "Name": "mayor",
+      |          "Callsite": {"Short Form": "5", "Long Form": "602"},
       |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
@@ -1438,6 +1459,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        {
       |          "RDD ID": 4,
       |          "Name": "mayor",
+      |          "Callsite": {"Short Form": "4", "Long Form": "800"},
       |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
@@ -1455,6 +1477,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        {
       |          "RDD ID": 5,
       |          "Name": "mayor",
+      |          "Callsite": {"Short Form": "5", "Long Form": "801"},
       |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
@@ -1472,6 +1495,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        {
       |          "RDD ID": 6,
       |          "Name": "mayor",
+      |          "Callsite": {"Short Form": "6", "Long Form": "802"},
       |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
@@ -1489,6 +1513,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        {
       |          "RDD ID": 7,
       |          "Name": "mayor",
+      |          "Callsite": {"Short Form": "7", "Long Form": "803"},
       |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to