Repository: spark
Updated Branches:
  refs/heads/branch-1.2 a689ab98d -> 96f76fc40


[SPARK-4601][Streaming] Set correct call site for streaming jobs so that it is 
displayed correctly on the Spark UI

When running the NetworkWordCount, the description of the word count jobs are 
set as "getCallsite at DStream:xxx" . This should be set to the line number of 
the streaming application that has the output operation that led to the job 
being created. This is because the callsite is incorrectly set in the thread 
launching the jobs. This PR fixes that.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #3455 from tdas/streaming-callsite-fix and squashes the following 
commits:

69fc26f [Tathagata Das] Set correct call site for streaming jobs so that it is 
displayed correctly on the Spark UI

(cherry picked from commit 69cd53eae205eb10d52eaf38466db58a23b6ae81)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96f76fc4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96f76fc4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96f76fc4

Branch: refs/heads/branch-1.2
Commit: 96f76fc405d1da181ed9edc733a897437ee0a6e0
Parents: a689ab9
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Nov 25 06:50:36 2014 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Nov 25 06:50:51 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/dstream/ForEachDStream.scala    | 1 +
 .../org/apache/spark/streaming/StreamingContextSuite.scala     | 6 +++++-
 2 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/96f76fc4/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 905bc72..1361c30 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -38,6 +38,7 @@ class ForEachDStream[T: ClassTag] (
     parent.getOrCompute(time) match {
       case Some(rdd) =>
         val jobFunc = () => {
+          ssc.sparkContext.setCallSite(creationSite)
           foreachFunc(rdd, time)
         }
         Some(new Job(time, jobFunc))

http://git-wip-us.apache.org/repos/asf/spark/blob/96f76fc4/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 4b49c4d..9f352bd 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -336,16 +336,20 @@ package object testPackage extends Assertions {
 
       // Verify creation site of generated RDDs
       var rddGenerated = false
-      var rddCreationSiteCorrect = true
+      var rddCreationSiteCorrect = false
+      var foreachCallSiteCorrect = false
 
       inputStream.foreachRDD { rdd =>
         rddCreationSiteCorrect = rdd.creationSite == creationSite
+        foreachCallSiteCorrect =
+          
rdd.sparkContext.getCallSite().shortForm.contains("StreamingContextSuite")
         rddGenerated = true
       }
       ssc.start()
 
       eventually(timeout(10000 millis), interval(10 millis)) {
         assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was 
not correct")
+        assert(rddGenerated && foreachCallSiteCorrect, "Call site in 
foreachRDD was not correct")
       }
     } finally {
       ssc.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to