Repository: spark
Updated Branches:
  refs/heads/branch-1.4 0327ca2b2 -> ff8b44995


http://git-wip-us.apache.org/repos/asf/spark/blob/ff8b4499/core/src/test/scala/org/apache/spark/JsonTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JsonTestUtils.scala 
b/core/src/test/scala/org/apache/spark/JsonTestUtils.scala
new file mode 100644
index 0000000..ba367cd
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/JsonTestUtils.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark
+
+import org.json4s._
+import org.json4s.jackson.JsonMethods
+
+trait JsonTestUtils {
+  def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
+    val Diff(c, a, d) = validateJson.diff(expectedJson)
+    val validatePretty = JsonMethods.pretty(validateJson)
+    val expectedPretty = JsonMethods.pretty(expectedJson)
+    val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty"
+    import org.scalactic.TripleEquals._
+    assert(c === JNothing, 
s"$errorMessage\nChanged:\n${JsonMethods.pretty(c)}")
+    assert(a === JNothing, s"$errorMessage\nAdded:\n${JsonMethods.pretty(a)}")
+    assert(d === JNothing, 
s"$errorMessage\nDeleted:\n${JsonMethods.pretty(d)}")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ff8b4499/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index b58d625..e04a792 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -28,9 +28,9 @@ import org.scalatest.FunSuite
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, 
WorkerStateResponse}
 import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, 
RecoveryState, WorkerInfo}
 import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
-import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf}
 
-class JsonProtocolSuite extends FunSuite {
+class JsonProtocolSuite extends FunSuite with JsonTestUtils {
 
   test("writeApplicationInfo") {
     val output = JsonProtocol.writeApplicationInfo(createAppInfo())
@@ -136,16 +136,6 @@ class JsonProtocolSuite extends FunSuite {
       case e: JsonParseException => fail("Invalid Json detected", e)
     }
   }
-
-  def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
-    val Diff(c, a, d) = validateJson diff expectedJson
-    val validatePretty = JsonMethods.pretty(validateJson)
-    val expectedPretty = JsonMethods.pretty(expectedJson)
-    val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty"
-    assert(c === JNothing, 
s"$errorMessage\nChanged:\n${JsonMethods.pretty(c)}")
-    assert(a === JNothing, s"$errorMessage\nAdded:\n${JsonMethods.pretty(a)}")
-    assert(d === JNothing, 
s"$errorMessage\nDelected:\n${JsonMethods.pretty(d)}")
-  }
 }
 
 object JsonConstants {

http://git-wip-us.apache.org/repos/asf/spark/blob/ff8b4499/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 71ba9c1..0744b68 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -14,22 +14,161 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.spark.deploy.history
 
-import javax.servlet.http.HttpServletRequest
-
-import scala.collection.mutable
+import java.io.{File, FileInputStream, FileWriter, IOException}
+import java.net.{HttpURLConnection, URL}
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
-import org.apache.hadoop.fs.Path
-import org.mockito.Mockito.{when}
-import org.scalatest.FunSuite
-import org.scalatest.Matchers
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.mockito.Mockito.when
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
 import org.scalatest.mock.MockitoSugar
 
+import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf}
 import org.apache.spark.ui.SparkUI
 
-class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar {
+/**
+ * A collection of tests against the historyserver, including comparing 
responses from the json
+ * metrics api to a set of known "golden files".  If new endpoints / 
parameters are added,
+ * cases should be added to this test suite.  The expected outcomes can be 
genered by running
+ * the HistoryServerSuite.main.  Note that this will blindly generate new 
expectation files matching
+ * the current behavior -- the developer must verify that behavior is correct.
+ *
+ * Similarly, if the behavior is changed, HistoryServerSuite.main can be run 
to update the
+ * expectations.  However, in general this should be done with extreme 
caution, as the metrics
+ * are considered part of Spark's public api.
+ */
+class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers 
with MockitoSugar
+  with JsonTestUtils {
+
+  private val logDir = new File("src/test/resources/spark-events")
+  private val expRoot = new 
File("src/test/resources/HistoryServerExpectations/")
+
+  private var provider: FsHistoryProvider = null
+  private var server: HistoryServer = null
+  private var port: Int = -1
+
+  def init(): Unit = {
+    val conf = new SparkConf()
+      .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+      .set("spark.history.fs.updateInterval", "0")
+      .set("spark.testing", "true")
+    provider = new FsHistoryProvider(conf)
+    provider.checkForLogs()
+    val securityManager = new SecurityManager(conf)
+
+    server = new HistoryServer(conf, provider, securityManager, 18080)
+    server.initialize()
+    server.bind()
+    port = server.boundPort
+  }
+
+  def stop(): Unit = {
+    server.stop()
+  }
+
+  before {
+    init()
+  }
+
+  after{
+    stop()
+  }
+
+  val cases = Seq(
+    "application list json" -> "applications",
+    "completed app list json" -> "applications?status=completed",
+    "running app list json" -> "applications?status=running",
+    "minDate app list json" -> "applications?minDate=2015-02-10",
+    "maxDate app list json" -> "applications?maxDate=2015-02-10",
+    "maxDate2 app list json" -> 
"applications?maxDate=2015-02-03T10:42:40.000CST",
+    "one app json" -> "applications/local-1422981780767",
+    "one app multi-attempt json" -> "applications/local-1426533911241",
+    "job list json" -> "applications/local-1422981780767/jobs",
+    "job list from multi-attempt app json(1)" -> 
"applications/local-1426533911241/1/jobs",
+    "job list from multi-attempt app json(2)" -> 
"applications/local-1426533911241/2/jobs",
+    "one job json" -> "applications/local-1422981780767/jobs/0",
+    "succeeded job list json" -> 
"applications/local-1422981780767/jobs?status=succeeded",
+    "succeeded&failed job list json" ->
+      "applications/local-1422981780767/jobs?status=succeeded&status=failed",
+    "executor list json" -> "applications/local-1422981780767/executors",
+    "stage list json" -> "applications/local-1422981780767/stages",
+    "complete stage list json" -> 
"applications/local-1422981780767/stages?status=complete",
+    "failed stage list json" -> 
"applications/local-1422981780767/stages?status=failed",
+    "one stage json" -> "applications/local-1422981780767/stages/1",
+    "one stage attempt json" -> "applications/local-1422981780767/stages/1/0",
+
+    "stage task summary" -> 
"applications/local-1427397477963/stages/20/0/taskSummary",
+    "stage task summary w/ custom quantiles" ->
+      
"applications/local-1427397477963/stages/20/0/taskSummary?quantiles=0.01,0.5,0.99",
+
+    "stage task list" -> 
"applications/local-1427397477963/stages/20/0/taskList",
+    "stage task list w/ offset & length" ->
+      
"applications/local-1427397477963/stages/20/0/taskList?offset=10&length=50",
+    "stage task list w/ sortBy" ->
+      
"applications/local-1427397477963/stages/20/0/taskList?sortBy=DECREASING_RUNTIME",
+    "stage task list w/ sortBy short names: -runtime" ->
+      "applications/local-1427397477963/stages/20/0/taskList?sortBy=-runtime",
+    "stage task list w/ sortBy short names: runtime" ->
+      "applications/local-1427397477963/stages/20/0/taskList?sortBy=runtime",
+
+    "stage list with accumulable json" -> 
"applications/local-1426533911241/1/stages",
+    "stage with accumulable json" -> 
"applications/local-1426533911241/1/stages/0/0",
+    "stage task list from multi-attempt app json(1)" ->
+      "applications/local-1426533911241/1/stages/0/0/taskList",
+    "stage task list from multi-attempt app json(2)" ->
+      "applications/local-1426533911241/2/stages/0/0/taskList",
+
+    "rdd list storage json" -> "applications/local-1422981780767/storage/rdd",
+    "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
+  )
+
+  // run a bunch of characterization tests -- just verify the behavior is the 
same as what is saved
+  // in the test resource folder
+  cases.foreach { case (name, path) =>
+    test(name) {
+      val (code, jsonOpt, errOpt) = getContentAndCode(path)
+      code should be (HttpServletResponse.SC_OK)
+      jsonOpt should be ('defined)
+      errOpt should be (None)
+      val json = jsonOpt.get
+      val exp = IOUtils.toString(new FileInputStream(
+        new File(expRoot, path + "/json_expectation")))
+      // compare the ASTs so formatting differences don't cause failures
+      import org.json4s._
+      import org.json4s.jackson.JsonMethods._
+      val jsonAst = parse(json)
+      val expAst = parse(exp)
+      assertValidDataInJson(jsonAst, expAst)
+    }
+  }
+
+  test("response codes on bad paths") {
+    val badAppId = getContentAndCode("applications/foobar")
+    badAppId._1 should be (HttpServletResponse.SC_NOT_FOUND)
+    badAppId._3 should be (Some("unknown app: foobar"))
+
+    val badStageId = 
getContentAndCode("applications/local-1422981780767/stages/12345")
+    badStageId._1 should be (HttpServletResponse.SC_NOT_FOUND)
+    badStageId._3 should be (Some("unknown stage: 12345"))
+
+    val badStageAttemptId = 
getContentAndCode("applications/local-1422981780767/stages/1/1")
+    badStageAttemptId._1 should be (HttpServletResponse.SC_NOT_FOUND)
+    badStageAttemptId._3 should be (Some("unknown attempt for stage 1.  Found 
attempts: [0]"))
+
+    val badStageId2 = 
getContentAndCode("applications/local-1422981780767/stages/flimflam")
+    badStageId2._1 should be (HttpServletResponse.SC_NOT_FOUND)
+    // will take some mucking w/ jersey to get a better error msg in this case
+
+    val badQuantiles = getContentAndCode(
+      
"applications/local-1427397477963/stages/20/0/taskSummary?quantiles=foo,0.1")
+    badQuantiles._1 should be (HttpServletResponse.SC_BAD_REQUEST)
+    badQuantiles._3 should be (Some("Bad value for parameter \"quantiles\".  
Expected a double, " +
+      "got \"foo\""))
+
+    getContentAndCode("foobar")._1 should be (HttpServletResponse.SC_NOT_FOUND)
+  }
 
   test("generate history page with relative links") {
     val historyServer = mock[HistoryServer]
@@ -54,4 +193,70 @@ class HistoryServerSuite extends FunSuite with Matchers 
with MockitoSugar {
     } yield (attrs.toString)
     justHrefs should contain(link)
   }
+
+  def getContentAndCode(path: String, port: Int = port): (Int, Option[String], 
Option[String]) = {
+    HistoryServerSuite.getContentAndCode(new 
URL(s"http://localhost:$port/json/v1/$path";))
+  }
+
+  def getUrl(path: String): String = {
+    HistoryServerSuite.getUrl(new URL(s"http://localhost:$port/json/v1/$path";))
+  }
+
+  def generateExpectation(path: String): Unit = {
+    val json = getUrl(path)
+    val dir = new File(expRoot, path)
+    dir.mkdirs()
+    val out = new FileWriter(new File(dir, "json_expectation"))
+    out.write(json)
+    out.close()
+  }
+}
+
+object HistoryServerSuite {
+  def main(args: Array[String]): Unit = {
+    // generate the "expected" results for the characterization tests.  Just 
blindly assume the
+    // current behavior is correct, and write out the returned json to the 
test/resource files
+
+    val suite = new HistoryServerSuite
+    FileUtils.deleteDirectory(suite.expRoot)
+    suite.expRoot.mkdirs()
+    try {
+      suite.init()
+      suite.cases.foreach { case (_, path) =>
+        suite.generateExpectation(path)
+      }
+    } finally {
+      suite.stop()
+    }
+  }
+
+  def getContentAndCode(url: URL): (Int, Option[String], Option[String]) = {
+    val connection = url.openConnection().asInstanceOf[HttpURLConnection]
+    connection.setRequestMethod("GET")
+    connection.connect()
+    val code = connection.getResponseCode()
+    val inString = try {
+      val in = Option(connection.getInputStream())
+      in.map{IOUtils.toString}
+    } catch {
+      case io: IOException => None
+    }
+    val errString = try {
+      val err = Option(connection.getErrorStream())
+      err.map{IOUtils.toString}
+    } catch {
+      case io: IOException => None
+    }
+    (code, inString, errString)
+  }
+
+  def getUrl(path: URL): String = {
+    val (code, resultOpt, error) = getContentAndCode(path)
+    if (code == 200) {
+      resultOpt.get
+    } else {
+      throw new RuntimeException(
+        "got code: " + code + " when getting " + path + " w/ error: " + error)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ff8b4499/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamTest.scala 
b/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamTest.scala
new file mode 100644
index 0000000..5274df9
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamTest.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import org.scalatest.{Matchers, FunSuite}
+
+class SimpleDateParamTest extends FunSuite with Matchers {
+
+  test("date parsing") {
+    new SimpleDateParam("2015-02-20T23:21:17.190GMT").timestamp should be 
(1424474477190L)
+    new SimpleDateParam("2015-02-20T17:21:17.190CST").timestamp should be 
(1424474477190L)
+    new SimpleDateParam("2015-02-20").timestamp should be (1424390400000L)  // 
GMT
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ff8b4499/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index d53d7f3..117b2c3 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -18,11 +18,13 @@
 package org.apache.spark.ui
 
 import java.net.{HttpURLConnection, URL}
-import javax.servlet.http.HttpServletRequest
+import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
 
 import scala.collection.JavaConversions._
 import scala.xml.Node
 
+import org.json4s._
+import org.json4s.jackson.JsonMethods
 import org.openqa.selenium.htmlunit.HtmlUnitDriver
 import org.openqa.selenium.{By, WebDriver}
 import org.scalatest._
@@ -33,8 +35,9 @@ import org.scalatest.time.SpanSugar._
 import org.apache.spark.LocalSparkContext._
 import org.apache.spark._
 import org.apache.spark.api.java.StorageLevels
+import org.apache.spark.deploy.history.HistoryServerSuite
 import org.apache.spark.shuffle.FetchFailedException
-
+import org.apache.spark.status.api.v1.{JacksonMessageWriter, StageStatus}
 
 /**
  * Selenium tests for the Spark Web UI.
@@ -42,6 +45,8 @@ import org.apache.spark.shuffle.FetchFailedException
 class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with 
BeforeAndAfterAll {
 
   implicit var webDriver: WebDriver = _
+  implicit val formats = DefaultFormats
+
 
   override def beforeAll(): Unit = {
     webDriver = new HtmlUnitDriver
@@ -76,28 +81,42 @@ class UISeleniumSuite extends FunSuite with WebBrowser with 
Matchers with Before
       val rdd = sc.parallelize(Seq(1, 2, 3))
       rdd.persist(StorageLevels.DISK_ONLY).count()
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to (ui.appUIAddress.stripSuffix("/") + "/storage")
+        goToUi(ui, "/storage")
         val tableRowText = findAll(cssSelector("#storage-by-rdd-table 
td")).map(_.text).toSeq
         tableRowText should contain (StorageLevels.DISK_ONLY.description)
       }
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to (ui.appUIAddress.stripSuffix("/") + "/storage/rdd/?id=0")
+        goToUi(ui, "/storage/rdd/?id=0")
         val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table 
td")).map(_.text).toSeq
         tableRowText should contain (StorageLevels.DISK_ONLY.description)
       }
 
+      val storageJson = getJson(ui, "storage/rdd")
+      storageJson.children.length should be (1)
+      (storageJson \ "storageLevel").extract[String] should be 
(StorageLevels.DISK_ONLY.description)
+      val rddJson = getJson(ui, "storage/rdd/0")
+      (rddJson  \ "storageLevel").extract[String] should be 
(StorageLevels.DISK_ONLY.description)
+
       rdd.unpersist()
       rdd.persist(StorageLevels.MEMORY_ONLY).count()
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to (ui.appUIAddress.stripSuffix("/") + "/storage")
+        goToUi(ui, "/storage")
         val tableRowText = findAll(cssSelector("#storage-by-rdd-table 
td")).map(_.text).toSeq
         tableRowText should contain (StorageLevels.MEMORY_ONLY.description)
       }
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to (ui.appUIAddress.stripSuffix("/") + "/storage/rdd/?id=0")
+        goToUi(ui, "/storage/rdd/?id=0")
         val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table 
td")).map(_.text).toSeq
         tableRowText should contain (StorageLevels.MEMORY_ONLY.description)
       }
+
+      val updatedStorageJson = getJson(ui, "storage/rdd")
+      updatedStorageJson.children.length should be (1)
+      (updatedStorageJson \ "storageLevel").extract[String] should be (
+        StorageLevels.MEMORY_ONLY.description)
+      val updatedRddJson = getJson(ui, "storage/rdd/0")
+      (updatedRddJson  \ "storageLevel").extract[String] should be (
+        StorageLevels.MEMORY_ONLY.description)
     }
   }
 
@@ -108,10 +127,13 @@ class UISeleniumSuite extends FunSuite with WebBrowser 
with Matchers with Before
         sc.parallelize(1 to 10).map { x => throw new Exception()}.collect()
       }
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages")
+        goToUi(sc, "/stages")
         find(id("active")) should be(None)  // Since we hide empty tables
         find(id("failed")).get.text should be("Failed Stages (1)")
       }
+      val stageJson = getJson(sc.ui.get, "stages")
+      stageJson.children.length should be (1)
+      (stageJson \ "status").extract[String] should be 
(StageStatus.FAILED.name())
 
       // Regression test for SPARK-2105
       class NotSerializable
@@ -120,12 +142,15 @@ class UISeleniumSuite extends FunSuite with WebBrowser 
with Matchers with Before
         sc.parallelize(1 to 10).map { x => unserializableObject}.collect()
       }
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages")
+        goToUi(sc, "/stages")
         find(id("active")) should be(None)  // Since we hide empty tables
         // The failure occurs before the stage becomes active, hence we should 
still show only one
         // failed stage, not two:
         find(id("failed")).get.text should be("Failed Stages (1)")
       }
+
+      val updatedStageJson = getJson(sc.ui.get, "stages")
+      updatedStageJson should be (stageJson)
     }
   }
 
@@ -138,7 +163,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with 
Matchers with Before
     withSpark(newSparkContext(killEnabled = true)) { sc =>
       runSlowJob(sc)
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages")
+        goToUi(sc, "/stages")
         assert(hasKillLink)
       }
     }
@@ -146,7 +171,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with 
Matchers with Before
     withSpark(newSparkContext(killEnabled = false)) { sc =>
       runSlowJob(sc)
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages")
+        goToUi(sc, "/stages")
         assert(!hasKillLink)
       }
     }
@@ -157,7 +182,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with 
Matchers with Before
       // If no job has been run in a job group, then "(Job Group)" should not 
appear in the header
       sc.parallelize(Seq(1, 2, 3)).count()
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
+        goToUi(sc, "/jobs")
         val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
         tableHeaders should not contain "Job Id (Job Group)"
       }
@@ -165,10 +190,22 @@ class UISeleniumSuite extends FunSuite with WebBrowser 
with Matchers with Before
       sc.setJobGroup("my-job-group", "my-job-group-description")
       sc.parallelize(Seq(1, 2, 3)).count()
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
+        goToUi(sc, "/jobs")
         val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
         tableHeaders should contain ("Job Id (Job Group)")
       }
+
+      val jobJson = getJson(sc.ui.get, "jobs")
+      for {
+        job @ JObject(_) <- jobJson
+        JInt(jobId) <- job \ "jobId"
+        jobGroup = job \ "jobGroup"
+      } {
+        jobId.toInt match {
+          case 0 => jobGroup should be (JNothing)
+          case 1 => jobGroup should be (JString("my-job-group"))
+        }
+      }
     }
   }
 
@@ -195,7 +232,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with 
Matchers with Before
       }
       mappedData.count()
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
+        goToUi(sc, "/jobs")
         find(cssSelector(".stage-progress-cell")).get.text should be ("2/2 (1 
failed)")
         // Ideally, the following test would pass, but currently we overcount 
completed tasks
         // if task recomputations occur:
@@ -204,6 +241,32 @@ class UISeleniumSuite extends FunSuite with WebBrowser 
with Matchers with Before
         // of completed tasks may be higher:
         find(cssSelector(".progress-cell .progress")).get.text should be ("3/2 
(1 failed)")
       }
+      val jobJson = getJson(sc.ui.get, "jobs")
+      (jobJson \ "numTasks").extract[Int]should be (2)
+      (jobJson \ "numCompletedTasks").extract[Int] should be (3)
+      (jobJson \ "numFailedTasks").extract[Int] should be (1)
+      (jobJson \ "numCompletedStages").extract[Int] should be (2)
+      (jobJson \ "numFailedStages").extract[Int] should be (1)
+      val stageJson = getJson(sc.ui.get, "stages")
+
+      for {
+        stage @ JObject(_) <- stageJson
+        JString(status) <- stage \ "status"
+        JInt(stageId) <- stage \ "stageId"
+        JInt(attemptId) <- stage \ "attemptId"
+      } {
+        val exp = if (attemptId == 0 && stageId == 1) StageStatus.FAILED else 
StageStatus.COMPLETE
+        status should be (exp.name())
+      }
+
+      for {
+        stageId <- 0 to 1
+        attemptId <- 0 to 1
+      } {
+        val exp = if (attemptId == 0 && stageId == 1) StageStatus.FAILED else 
StageStatus.COMPLETE
+        val stageJson = getJson(sc.ui.get, s"stages/$stageId/$attemptId")
+        (stageJson \ "status").extract[String] should be (exp.name())
+      }
     }
   }
 
@@ -218,7 +281,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with 
Matchers with Before
       // Start the job:
       rdd.countAsync()
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/?id=0")
+        goToUi(sc, "/jobs/job/?id=0")
         find(id("active")).get.text should be ("Active Stages (1)")
         find(id("pending")).get.text should be ("Pending Stages (2)")
         // Essentially, we want to check that none of the stage rows show
@@ -244,7 +307,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with 
Matchers with Before
       rdd.count()
       rdd.count()
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
+        goToUi(sc, "/jobs")
         // The completed jobs table should have two rows. The first row will 
be the most recent job:
         val firstRow = find(cssSelector("tbody tr")).get.underlying
         val firstRowColumns = firstRow.findElements(By.tagName("td"))
@@ -271,7 +334,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with 
Matchers with Before
       rdd.count()
       rdd.count()
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/?id=1")
+        goToUi(sc, "/jobs/job/?id=1")
         find(id("pending")) should be (None)
         find(id("active")) should be (None)
         find(id("failed")) should be (None)
@@ -299,7 +362,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with 
Matchers with Before
       rdd.count()
       rdd.count()
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
+        goToUi(sc, "/jobs")
         findAll(cssSelector("tbody tr a")).foreach { link =>
           link.text.toLowerCase should include ("count")
           link.text.toLowerCase should not include "unknown"
@@ -321,7 +384,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with 
Matchers with Before
       }
       sparkUI.attachTab(newTab)
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/"))
+        goToUi(sc, "")
         find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
         find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
         find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
@@ -330,12 +393,12 @@ class UISeleniumSuite extends FunSuite with WebBrowser 
with Matchers with Before
       }
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
         // check whether new page exists
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo")
+        goToUi(sc, "/foo")
         find(cssSelector("b")).get.text should include ("html magic")
       }
       sparkUI.detachTab(newTab)
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        go to (sc.ui.get.appUIAddress.stripSuffix("/"))
+        goToUi(sc, "")
         find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
         find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
         find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
@@ -344,7 +407,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with 
Matchers with Before
       }
       eventually(timeout(10 seconds), interval(50 milliseconds)) {
         // check new page not exist
-        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo")
+        goToUi(sc, "/foo")
         find(cssSelector("b")) should be(None)
       }
     }
@@ -371,4 +434,163 @@ class UISeleniumSuite extends FunSuite with WebBrowser 
with Matchers with Before
       }
     }
   }
+
+  test("stage & job retention") {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test")
+      .set("spark.ui.enabled", "true")
+      .set("spark.ui.port", "0")
+      .set("spark.ui.retainedStages", "3")
+      .set("spark.ui.retainedJobs", "2")
+    val sc = new SparkContext(conf)
+    assert(sc.ui.isDefined)
+
+    withSpark(sc) { sc =>
+      // run a few jobs & stages ...
+      (0 until 5).foreach { idx =>
+        // NOTE: if we reverse the order, things don't really behave nicely
+        // we lose the stage for a job we keep, and then the job doesn't know
+        // about its last stage
+        sc.parallelize(idx to (idx + 
3)).map(identity).groupBy(identity).map(identity)
+          .groupBy(identity).count()
+        sc.parallelize(idx to (idx + 3)).collect()
+      }
+
+      val expJobInfo = Seq(
+        ("9", "collect"),
+        ("8", "count")
+      )
+
+      eventually(timeout(1 second), interval(50 milliseconds)) {
+        goToUi(sc, "/jobs")
+        // The completed jobs table should have two rows. The first row will 
be the most recent job:
+        find("completed-summary").get.text should be ("Completed Jobs: 10, 
only showing 2")
+        find("completed").get.text should be ("Completed Jobs (10, only 
showing 2)")
+        val rows = findAll(cssSelector("tbody 
tr")).toIndexedSeq.map{_.underlying}
+        rows.size should be (expJobInfo.size)
+        for {
+          (row, idx) <- rows.zipWithIndex
+          columns = row.findElements(By.tagName("td"))
+          id = columns(0).getText()
+          description = columns(1).getText()
+        } {
+          id should be (expJobInfo(idx)._1)
+          description should include (expJobInfo(idx)._2)
+        }
+      }
+
+      val jobsJson = getJson(sc.ui.get, "jobs")
+      jobsJson.children.size should be (expJobInfo.size)
+      for {
+        (job @ JObject(_),idx) <- jobsJson.children.zipWithIndex
+        id = (job \ "jobId").extract[String]
+        name = (job \ "name").extract[String]
+      } {
+        withClue(s"idx = $idx; id = $id; name = ${name.substring(0,20)}") {
+          id should be (expJobInfo(idx)._1)
+          name should include (expJobInfo(idx)._2)
+        }
+      }
+
+      // what about when we query for a job that did exist, but has been 
cleared?
+      goToUi(sc, "/jobs/job/?id=7")
+      find("no-info").get.text should be ("No information to display for job 
7")
+
+      val badJob = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get, 
"jobs/7"))
+      badJob._1 should be (HttpServletResponse.SC_NOT_FOUND)
+      badJob._2 should be (None)
+      badJob._3 should be (Some("unknown job: 7"))
+
+      val expStageInfo = Seq(
+        ("19", "collect"),
+        ("18", "count"),
+        ("17", "groupBy")
+      )
+
+      eventually(timeout(1 second), interval(50 milliseconds)) {
+        goToUi(sc, "/stages")
+        find("completed-summary").get.text should be ("Completed Stages: 20, 
only showing 3")
+        find("completed").get.text should be ("Completed Stages (20, only 
showing 3)")
+        val rows = findAll(cssSelector("tbody 
tr")).toIndexedSeq.map{_.underlying}
+        rows.size should be (3)
+        for {
+          (row, idx) <- rows.zipWithIndex
+          columns = row.findElements(By.tagName("td"))
+          id = columns(0).getText()
+          description = columns(1).getText()
+        } {
+          id should be (expStageInfo(idx)._1)
+          description should include (expStageInfo(idx)._2)
+        }
+      }
+
+      val stagesJson = getJson(sc.ui.get, "stages")
+      stagesJson.children.size should be (3)
+      for {
+        (stage @ JObject(_), idx) <- stagesJson.children.zipWithIndex
+        id = (stage \ "stageId").extract[String]
+        name = (stage \ "name").extract[String]
+      } {
+        id should be (expStageInfo(idx)._1)
+        name should include (expStageInfo(idx)._2)
+      }
+
+      // nonexistent stage
+
+      goToUi(sc, "/stages/stage/?id=12&attempt=0")
+      find("no-info").get.text should be ("No information to display for Stage 
12 (Attempt 0)")
+      val badStage = 
HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get,"stages/12/0"))
+      badStage._1 should be (HttpServletResponse.SC_NOT_FOUND)
+      badStage._2 should be (None)
+      badStage._3 should be (Some("unknown stage: 12"))
+
+      val badAttempt = 
HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get,"stages/19/15"))
+      badAttempt._1 should be (HttpServletResponse.SC_NOT_FOUND)
+      badAttempt._2 should be (None)
+      badAttempt._3 should be (Some("unknown attempt for stage 19.  Found 
attempts: [0]"))
+
+      val badStageAttemptList = HistoryServerSuite.getContentAndCode(
+        jsonUrl(sc.ui.get, "stages/12"))
+      badStageAttemptList._1 should be (HttpServletResponse.SC_NOT_FOUND)
+      badStageAttemptList._2 should be (None)
+      badStageAttemptList._3 should be (Some("unknown stage: 12"))
+    }
+  }
+
+  test("live UI json application list") {
+    withSpark(newSparkContext()) { sc =>
+      val appListRawJson = HistoryServerSuite.getUrl(new URL(
+        sc.ui.get.appUIAddress + "/json/v1/applications"))
+      val appListJsonAst = JsonMethods.parse(appListRawJson)
+      appListJsonAst.children.length should be (1)
+      val attempts = (appListJsonAst \ "attempts").children
+      attempts.size should be (1)
+      (attempts(0) \ "completed").extract[Boolean] should be (false)
+      parseDate(attempts(0) \ "startTime") should be (sc.startTime)
+      parseDate(attempts(0) \ "endTime") should be (-1)
+      val oneAppJsonAst = getJson(sc.ui.get, "")
+      oneAppJsonAst should be (appListJsonAst.children(0))
+    }
+  }
+
+  def goToUi(sc: SparkContext, path: String): Unit = {
+    goToUi(sc.ui.get, path)
+  }
+
+  def goToUi(ui: SparkUI, path: String): Unit = {
+    go to (ui.appUIAddress.stripSuffix("/") + path)
+  }
+
+  def parseDate(json: JValue): Long = {
+    JacksonMessageWriter.makeISODateFormat.parse(json.extract[String]).getTime
+  }
+
+  def getJson(ui: SparkUI, path: String): JValue = {
+    JsonMethods.parse(HistoryServerSuite.getUrl(jsonUrl(ui, path)))
+  }
+
+  def jsonUrl(ui: SparkUI, path: String): URL = {
+    new URL(ui.appUIAddress + "/json/v1/applications/test/" + path)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ff8b4499/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 8a85928..1e0fc15 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -174,6 +174,80 @@ making it easy to identify slow tasks, data skew, etc.
 
 Note that the history server only displays completed Spark jobs. One way to 
signal the completion of a Spark job is to stop the Spark Context explicitly 
(`sc.stop()`), or in Python using the `with SparkContext() as sc:` to handle 
the Spark Context setup and tear down, and still show the job history on the UI.
 
+## REST API
+
+In addition to viewing the metrics in the UI, they are also available as JSON. 
 This gives developers
+an easy way to create new visualizations and monitoring tools for Spark.  The 
JSON is available for
+both running applications, and in the history server.  The endpoints are 
mounted at `/json/v1`.  Eg.,
+for the history server, they would typically be accessible at 
`http://<server-url>:18080/json/v1`, and
+for a running application, at `http://localhost:4040/json/v1`.
+
+<table class="table">
+  <tr><th>Endpoint</th><th>Meaning</th></tr>
+  <tr>
+    <td><code>/applications</code></td>
+    <td>A list of all applications</td>
+  </tr>
+  <tr>
+    <td><code>/applications/[app-id]/jobs</code></td>
+    <td>A list of all jobs for a given application</td>
+  </tr>
+  <tr>
+    <td><code>/applications/[app-id]/jobs/[job-id]</code></td>
+    <td>Details for the given job</td>
+  </tr>
+  <tr>
+    <td><code>/applications/[app-id]/stages</code></td>
+    <td>A list of all stages for a given application</td>
+  </tr>
+  <tr>
+    <td><code>/applications/[app-id]/stages/[stage-id]</code></td>
+    <td>A list of all attempts for the given stage</td>
+  </tr>
+  <tr>
+    
<td><code>/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]</code></td>
+    <td>Details for the given stage attempt</td>
+  </tr>
+  <tr>
+    
<td><code>/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary</code></td>
+    <td>Summary metrics of all tasks in the given stage attempt</td>
+  </tr>
+  <tr>
+    
<td><code>/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList</code></td>
+    <td>A list of all tasks for the given stage attempt</td>
+  </tr>
+  <tr>
+    <td><code>/applications/[app-id]/executors</code></td>
+    <td>A list of all executors for the given application</td>
+  </tr>
+  <tr>
+    <td><code>/applications/[app-id]/storage/rdd</code></td>
+    <td>A list of stored RDDs for the given application</td>
+  </tr>
+  <tr>
+    <td><code>/applications/[app-id]/storage/rdd/[rdd-id]</code></td>
+    <td>Details for the storage status of a given RDD</td>
+  </tr>
+</table>
+
+When running on Yarn, each application has multiple attempts, so `[app-id]` is 
actually
+`[app-id]/[attempt-id]` in all cases.
+
+These endpoints have been strongly versioned to make it easier to develop 
applications on top.
+ In particular, Spark guarantees:
+
+* Endpoints will never be removed from one version
+* Individual fields will never be removed for any given endpoint
+* New endpoints may be added
+* New fields may be added to existing endpoints
+* New versions of the api may be added in the future at a separate endpoint 
(eg., `json/v2`).  New versions are *not* required to be backwards compatible.
+* Api versions may be dropped, but only after at least one minor release of 
co-existing with a new api version
+
+Note that even when examining the UI of a running applications, the 
`applications/[app-id]` portion is
+still required, though there is only one application available.  Eg. to see 
the list of jobs for the
+running app, you would go to 
`http://localhost:4040/json/v1/applications/[app-id]/jobs`.  This is to
+keep the paths consistent in both modes.
+
 # Metrics
 
 Spark has a configurable metrics system based on the 

http://git-wip-us.apache.org/repos/asf/spark/blob/ff8b4499/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 12ad7c5..94cf283 100644
--- a/pom.xml
+++ b/pom.xml
@@ -624,6 +624,18 @@
         </exclusions>
       </dependency>
       <dependency>
+        <groupId>com.sun.jersey</groupId>
+        <artifactId>jersey-server</artifactId>
+        <version>1.9</version>
+        <scope>${hadoop.deps.scope}</scope>
+      </dependency>
+      <dependency>
+        <groupId>com.sun.jersey</groupId>
+        <artifactId>jersey-core</artifactId>
+        <version>1.9</version>
+        <scope>${hadoop.deps.scope}</scope>
+      </dependency>
+      <dependency>
         <groupId>org.scala-lang</groupId>
         <artifactId>scala-compiler</artifactId>
         <version>${scala.version}</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to