Repository: spark
Updated Branches:
  refs/heads/branch-1.3 67fa6d1f8 -> 47cce984e


[SPARK-6077] Remove streaming tab while stopping StreamingContext

Currently we would create a new streaming tab for each streamingContext even if 
there's already one on the same sparkContext which would cause duplicate 
StreamingTab created and none of them is taking effect.
snapshot: 
https://www.dropbox.com/s/t4gd6hqyqo0nivz/bad%20multiple%20streamings.png?dl=0
How to reproduce:
1)
import org.apache.spark.SparkConf
import org.apache.spark.streaming.
{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999, 
StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
.....
2)
ssc.stop(false)
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999, 
StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()

Author: lisurprise <zhichao...@intel.com>

Closes #4828 from zhichao-li/master and squashes the following commits:

c329806 [lisurprise] add test for attaching/detaching streaming tab
51e6c7f [lisurprise] move detach method into StreamingTab
31a44fa [lisurprise] add unit test for attaching and detaching new tab
db25ed2 [lisurprise] clean code
8281bcb [lisurprise] clean code
193c542 [lisurprise] remove streaming tab while closing streaming context

(cherry picked from commit f149b8b5e542af44650923d0156f037121b45a20)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47cce984
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47cce984
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47cce984

Branch: refs/heads/branch-1.3
Commit: 47cce984eb6286fcedde8bf480442a66a87de09c
Parents: 67fa6d1
Author: lisurprise <zhichao...@intel.com>
Authored: Mon Mar 16 13:10:32 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Mar 16 13:10:46 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/ui/WebUI.scala  | 28 ++++--
 .../org/apache/spark/ui/UISeleniumSuite.scala   | 50 ++++++++++-
 .../scala/org/apache/spark/ui/UISuite.scala     | 38 +-------
 streaming/pom.xml                               |  5 ++
 .../spark/streaming/StreamingContext.scala      |  1 +
 .../spark/streaming/ui/StreamingPage.scala      |  4 +-
 .../spark/streaming/ui/StreamingTab.scala       |  4 +
 .../spark/streaming/UISeleniumSuite.scala       | 95 ++++++++++++++++++++
 .../org/apache/spark/streaming/UISuite.scala    | 55 ------------
 9 files changed, 179 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/47cce984/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala 
b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index ec68837..ea548f2 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -20,14 +20,15 @@ package org.apache.spark.ui
 import javax.servlet.http.HttpServletRequest
 
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
 import scala.xml.Node
 
 import org.eclipse.jetty.servlet.ServletContextHandler
 import org.json4s.JsonAST.{JNothing, JValue}
 
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
 
 /**
  * The top level component of the UI hierarchy that contains the server.
@@ -45,6 +46,7 @@ private[spark] abstract class WebUI(
 
   protected val tabs = ArrayBuffer[WebUITab]()
   protected val handlers = ArrayBuffer[ServletContextHandler]()
+  protected val pageToHandlers = new HashMap[WebUIPage, 
ArrayBuffer[ServletContextHandler]]
   protected var serverInfo: Option[ServerInfo] = None
   protected val localHostName = Utils.localHostName()
   protected val publicHostName = 
Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
@@ -60,14 +62,30 @@ private[spark] abstract class WebUI(
     tab.pages.foreach(attachPage)
     tabs += tab
   }
+  
+  def detachTab(tab: WebUITab) {
+    tab.pages.foreach(detachPage)
+    tabs -= tab
+  }
+  
+  def detachPage(page: WebUIPage) {
+    pageToHandlers.remove(page).foreach(_.foreach(detachHandler))
+  }
 
   /** Attach a page to this UI. */
   def attachPage(page: WebUIPage) {
     val pagePath = "/" + page.prefix
-    attachHandler(createServletHandler(pagePath,
-      (request: HttpServletRequest) => page.render(request), securityManager, 
basePath))
-    attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
-      (request: HttpServletRequest) => page.renderJson(request), 
securityManager, basePath))
+    val renderHandler = createServletHandler(pagePath,
+      (request: HttpServletRequest) => page.render(request), securityManager, 
basePath)
+    val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + 
"/json",
+      (request: HttpServletRequest) => page.renderJson(request), 
securityManager, basePath)
+    attachHandler(renderHandler)
+    attachHandler(renderJsonHandler)
+    pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
+      .append(renderHandler)
+    pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
+      .append(renderJsonHandler)
+    
   }
 
   /** Attach a handler to this UI. */

http://git-wip-us.apache.org/repos/asf/spark/blob/47cce984/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 6a97238..0d15598 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -17,20 +17,24 @@
 
 package org.apache.spark.ui
 
+import javax.servlet.http.HttpServletRequest
+
 import scala.collection.JavaConversions._
+import scala.xml.Node
 
-import org.openqa.selenium.{By, WebDriver}
 import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.openqa.selenium.{By, WebDriver}
 import org.scalatest._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.selenium.WebBrowser
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark._
 import org.apache.spark.LocalSparkContext._
+import org.apache.spark._
 import org.apache.spark.api.java.StorageLevels
 import org.apache.spark.shuffle.FetchFailedException
 
+
 /**
  * Selenium tests for the Spark Web UI.
  */
@@ -310,4 +314,46 @@ class UISeleniumSuite extends FunSuite with WebBrowser 
with Matchers with Before
       }
     }
   }
+
+  test("attaching and detaching a new tab") {
+    withSpark(newSparkContext()) { sc =>
+      val sparkUI = sc.ui.get
+
+      val newTab = new WebUITab(sparkUI, "foo") {
+        attachPage(new WebUIPage("") {
+          def render(request: HttpServletRequest): Seq[Node] = {
+            <b>"html magic"</b>
+          }
+        })
+      }
+      sparkUI.attachTab(newTab)
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        go to (sc.ui.get.appUIAddress.stripSuffix("/"))
+        find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
+        find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
+        find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
+        find(cssSelector("""ul li a[href*="environment"]""")) should not 
be(None)
+        find(cssSelector("""ul li a[href*="foo"]""")) should not be(None)
+      }
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        // check whether new page exists
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo")
+        find(cssSelector("b")).get.text should include ("html magic")
+      }
+      sparkUI.detachTab(newTab)
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        go to (sc.ui.get.appUIAddress.stripSuffix("/"))
+        find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
+        find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
+        find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
+        find(cssSelector("""ul li a[href*="environment"]""")) should not 
be(None)
+        find(cssSelector("""ul li a[href*="foo"]""")) should be(None)
+      }
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        // check new page not exist
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo")
+        find(cssSelector("b")) should be(None)
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/47cce984/core/src/test/scala/org/apache/spark/ui/UISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 92a21f8..77a038d 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.ui
 
 import java.net.ServerSocket
-import javax.servlet.http.HttpServletRequest
 
 import scala.io.Source
 import scala.util.{Failure, Success, Try}
@@ -28,9 +27,8 @@ import org.scalatest.FunSuite
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.LocalSparkContext._
-import scala.xml.Node
+import org.apache.spark.{SparkConf, SparkContext}
 
 class UISuite extends FunSuite {
 
@@ -72,40 +70,6 @@ class UISuite extends FunSuite {
     }
   }
 
-  ignore("attaching a new tab") {
-    withSpark(newSparkContext()) { sc =>
-      val sparkUI = sc.ui.get
-
-      val newTab = new WebUITab(sparkUI, "foo") {
-        attachPage(new WebUIPage("") {
-          def render(request: HttpServletRequest): Seq[Node] = {
-            <b>"html magic"</b>
-          }
-        })
-      }
-      sparkUI.attachTab(newTab)
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        val html = Source.fromURL(sparkUI.appUIAddress).mkString
-        assert(!html.contains("random data that should not be present"))
-
-        // check whether new page exists
-        assert(html.toLowerCase.contains("foo"))
-
-        // check whether other pages still exist
-        assert(html.toLowerCase.contains("stages"))
-        assert(html.toLowerCase.contains("storage"))
-        assert(html.toLowerCase.contains("environment"))
-        assert(html.toLowerCase.contains("executors"))
-      }
-
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + 
"/foo").mkString
-        // check whether new page exists
-        assert(html.contains("magic"))
-      }
-    }
-  }
-
   test("jetty selects different port under contention") {
     val server = new ServerSocket(0)
     val startPort = server.getLocalPort

http://git-wip-us.apache.org/repos/asf/spark/blob/47cce984/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 72e4465..3d4fb2d 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -83,6 +83,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.seleniumhq.selenium</groupId>
+      <artifactId>selenium-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>com.novocode</groupId>
       <artifactId>junit-interface</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/spark/blob/47cce984/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ba3f234..b5b6770 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -578,6 +578,7 @@ class StreamingContext private[streaming] (
     // Even if we have already stopped, we still need to attempt to stop the 
SparkContext because
     // a user might stop(stopSparkContext = false) and then call 
stop(stopSparkContext = true).
     if (stopSparkContext) sc.stop()
+    uiTab.foreach(_.detach())
     // The state should always be Stopped after calling `stop()`, even if we 
haven't started yet:
     state = Stopped
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/47cce984/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 98e9a2e..bfe8086 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -32,7 +32,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
   extends WebUIPage("") with Logging {
 
   private val listener = parent.listener
-  private val startTime = Calendar.getInstance().getTime()
+  private val startTime = System.currentTimeMillis()
   private val emptyCell = "-"
 
   /** Render the page */
@@ -47,7 +47,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
 
   /** Generate basic stats of the streaming program */
   private def generateBasicStats(): Seq[Node] = {
-    val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+    val timeSinceStart = System.currentTimeMillis() - startTime
     <ul class ="unstyled">
       <li>
         <strong>Started at: </strong> {startTime.toString}

http://git-wip-us.apache.org/repos/asf/spark/blob/47cce984/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index d9d04cd..9a860ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -36,6 +36,10 @@ private[spark] class StreamingTab(ssc: StreamingContext)
   ssc.addStreamingListener(listener)
   attachPage(new StreamingPage(this))
   parent.attachTab(this)
+
+  def detach() {
+    getSparkUI(ssc).detachTab(this)
+  }
 }
 
 private object StreamingTab {

http://git-wip-us.apache.org/repos/asf/spark/blob/47cce984/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
new file mode 100644
index 0000000..87a0395
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.openqa.selenium.WebDriver
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.scalatest._
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.selenium.WebBrowser
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark._
+
+
+
+
+/**
+ * Selenium tests for the Spark Web UI.
+ */
+class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with 
BeforeAndAfterAll with TestSuiteBase {
+
+  implicit var webDriver: WebDriver = _
+
+  override def beforeAll(): Unit = {
+    webDriver = new HtmlUnitDriver
+  }
+
+  override def afterAll(): Unit = {
+    if (webDriver != null) {
+      webDriver.quit()
+    }
+  }
+
+  /**
+   * Create a test SparkStreamingContext with the SparkUI enabled.
+   */
+  private def newSparkStreamingContext(): StreamingContext = {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test")
+      .set("spark.ui.enabled", "true")
+    val ssc = new StreamingContext(conf, Seconds(1))
+    assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
+    ssc
+  }
+
+  test("attaching and detaching a Streaming tab") {
+    withStreamingContext(newSparkStreamingContext()) { ssc =>
+      val sparkUI = ssc.sparkContext.ui.get
+
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        go to (sparkUI.appUIAddress.stripSuffix("/"))
+        find(cssSelector( """ul li a[href*="streaming"]""")) should not be 
(None)
+      }
+
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        // check whether streaming page exists
+        go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
+        val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
+        statisticText should contain("Network receivers:")
+        statisticText should contain("Batch interval:")
+      }
+
+      ssc.stop(false)
+
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        go to (sparkUI.appUIAddress.stripSuffix("/"))
+        find(cssSelector( """ul li a[href*="streaming"]""")) should be(None)
+      }
+
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
+        val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
+        statisticText should not contain ("Network receivers:")
+        statisticText should not contain ("Batch interval:")
+      }
+    }
+  }
+}
+  

http://git-wip-us.apache.org/repos/asf/spark/blob/47cce984/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
deleted file mode 100644
index 8e30118..0000000
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import scala.io.Source
-
-import org.scalatest.FunSuite
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.SpanSugar._
-
-import org.apache.spark.SparkConf
-
-class UISuite extends FunSuite {
-
-  // Ignored: See SPARK-1530
-  ignore("streaming tab in spark UI") {
-    val conf = new SparkConf()
-      .setMaster("local")
-      .setAppName("test")
-      .set("spark.ui.enabled", "true")
-    val ssc = new StreamingContext(conf, Seconds(1))
-    assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
-    val ui = ssc.sc.ui.get
-
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
-      val html = Source.fromURL(ui.appUIAddress).mkString
-      assert(!html.contains("random data that should not be present"))
-      // test if streaming tab exist
-      assert(html.toLowerCase.contains("streaming"))
-      // test if other Spark tabs still exist
-      assert(html.toLowerCase.contains("stages"))
-    }
-
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
-      val html = Source.fromURL(ui.appUIAddress.stripSuffix("/") + 
"/streaming").mkString
-      assert(html.toLowerCase.contains("batch"))
-      assert(html.toLowerCase.contains("network"))
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to