Repository: spark Updated Branches: refs/heads/master d24076019 -> 69cd53eae
[SPARK-4601][Streaming] Set correct call site for streaming jobs so that it is displayed correctly on the Spark UI When running the NetworkWordCount, the description of the word count jobs are set as "getCallsite at DStream:xxx" . This should be set to the line number of the streaming application that has the output operation that led to the job being created. This is because the callsite is incorrectly set in the thread launching the jobs. This PR fixes that. Author: Tathagata Das <[email protected]> Closes #3455 from tdas/streaming-callsite-fix and squashes the following commits: 69fc26f [Tathagata Das] Set correct call site for streaming jobs so that it is displayed correctly on the Spark UI Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69cd53ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69cd53ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69cd53ea Branch: refs/heads/master Commit: 69cd53eae205eb10d52eaf38466db58a23b6ae81 Parents: d240760 Author: Tathagata Das <[email protected]> Authored: Tue Nov 25 06:50:36 2014 -0800 Committer: Tathagata Das <[email protected]> Committed: Tue Nov 25 06:50:36 2014 -0800 ---------------------------------------------------------------------- .../org/apache/spark/streaming/dstream/ForEachDStream.scala | 1 + .../org/apache/spark/streaming/StreamingContextSuite.scala | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/69cd53ea/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index 905bc72..1361c30 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -38,6 +38,7 @@ class ForEachDStream[T: ClassTag] ( parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => { + ssc.sparkContext.setCallSite(creationSite) foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) http://git-wip-us.apache.org/repos/asf/spark/blob/69cd53ea/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 4b49c4d..9f352bd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -336,16 +336,20 @@ package object testPackage extends Assertions { // Verify creation site of generated RDDs var rddGenerated = false - var rddCreationSiteCorrect = true + var rddCreationSiteCorrect = false + var foreachCallSiteCorrect = false inputStream.foreachRDD { rdd => rddCreationSiteCorrect = rdd.creationSite == creationSite + foreachCallSiteCorrect = + rdd.sparkContext.getCallSite().shortForm.contains("StreamingContextSuite") rddGenerated = true } ssc.start() eventually(timeout(10000 millis), interval(10 millis)) { assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct") + assert(rddGenerated && foreachCallSiteCorrect, "Call site in foreachRDD was not correct") } } finally { ssc.stop() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
