Repository: spark
Updated Branches:
  refs/heads/branch-1.5 f83b6e625 -> d0e6e5312


[SPARK-10649] [STREAMING] Prevent inheriting job group and irrelevant job 
description in streaming jobs

**Note that this PR only for branch 1.5. See #8781 for the solution for Spark 
master.**

The job group, and job descriptions information is passed through thread local 
properties, and get inherited by child threads. In case of spark streaming, the 
streaming jobs inherit these properties from the thread that called 
streamingContext.start(). This may not make sense.

1. Job group: This is mainly used for cancelling a group of jobs together. It 
does not make sense to cancel streaming jobs like this, as the effect will be 
unpredictable. And its not a valid usecase any way, to cancel a streaming 
context, call streamingContext.stop()

2. Job description: This is used to pass on nice text descriptions for jobs to 
show up in the UI. The job description of the thread that calls 
streamingContext.start() is not useful for all the streaming jobs, as it does 
not make sense for all of the streaming jobs to have the same description, and 
the description may or may not be related to streaming.

The solution in this PR is meant for the Spark branch 1.5, where local 
properties are inherited by cloning the properties only when the Spark config 
`spark.localProperties.clone` is set to `true` (see #8781 for the PR for Spark 
master branch). Similar to the approach taken by #8721, StreamingContext sets 
that configuration to true, which makes sure that all subsequent child threads 
get a cloned copy of the threadlocal properties. This allows the job group and 
job description to be explicitly removed in the thread that starts the 
streaming scheduler, so that all the subsequent child threads does not inherit 
them. Also, the starting is done in a new child thread, so that setting the job 
group and description for streaming, does not change those properties in the 
thread that called streamingContext.start().

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #8856 from tdas/SPARK-10649-1.5.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0e6e531
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0e6e531
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0e6e531

Branch: refs/heads/branch-1.5
Commit: d0e6e5312b8b51617b1e7d6f32be50fbe3d48b2a
Parents: f83b6e6
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Sep 22 00:06:18 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Sep 22 00:06:18 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/ThreadUtils.scala     | 59 ++++++++++++++++++++
 .../apache/spark/util/ThreadUtilsSuite.scala    | 24 +++++++-
 .../spark/streaming/StreamingContext.scala      | 21 ++++++-
 .../spark/streaming/StreamingContextSuite.scala | 32 +++++++++++
 4 files changed, 132 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d0e6e531/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index ca5624a..22e291a 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -21,6 +21,7 @@ package org.apache.spark.util
 import java.util.concurrent._
 
 import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
+import scala.util.control.NonFatal
 
 import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
 
@@ -86,4 +87,62 @@ private[spark] object ThreadUtils {
     val threadFactory = new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
     Executors.newSingleThreadScheduledExecutor(threadFactory)
   }
+
+  /**
+   * Run a piece of code in a new thread and return the result. Exception in 
the new thread is
+   * thrown in the caller thread with an adjusted stack trace that removes 
references to this
+   * method for clarity. The exception stack traces will be like the following
+   *
+   * SomeException: exception-message
+   *   at CallerClass.body-method (sourcefile.scala)
+   *   at ... run in separate thread using org.apache.spark.util.ThreadUtils 
... ()
+   *   at CallerClass.caller-method (sourcefile.scala)
+   *   ...
+   */
+  def runInNewThread[T](
+      threadName: String,
+      isDaemon: Boolean = true)(body: => T): T = {
+    @volatile var exception: Option[Throwable] = None
+    @volatile var result: T = null.asInstanceOf[T]
+
+    val thread = new Thread(threadName) {
+      override def run(): Unit = {
+        try {
+          result = body
+        } catch {
+          case NonFatal(e) =>
+            exception = Some(e)
+        }
+      }
+    }
+    thread.setDaemon(isDaemon)
+    thread.start()
+    thread.join()
+
+    exception match {
+      case Some(realException) =>
+        // Remove the part of the stack that shows method calls into this 
helper method
+        // This means drop everything from the top until the stack element
+        // ThreadUtils.runInNewThread(), and then drop that as well (hence the 
`drop(1)`).
+        val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
+          ! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)
+
+        // Remove the part of the new thread stack that shows methods call 
from this helper method
+        val extraStackTrace = realException.getStackTrace.takeWhile(
+          ! _.getClassName.contains(this.getClass.getSimpleName))
+
+        // Combine the two stack traces, with a place holder just specifying 
that there
+        // was a helper method used, without any further details of the helper
+        val placeHolderStackElem = new StackTraceElement(
+          s"... run in separate thread using 
${ThreadUtils.getClass.getName.stripSuffix("$")} ..",
+          " ", "", -1)
+        val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ 
baseStackTrace
+
+        // Update the stack trace and rethrow the exception in the caller 
thread
+        realException.setStackTrace(finalStackTrace)
+        throw realException
+      case None =>
+        result
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d0e6e531/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 8c51e6b..620e4de 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -20,8 +20,9 @@ package org.apache.spark.util
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
-import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.util.Random
 
 import org.apache.spark.SparkFunSuite
 
@@ -66,4 +67,25 @@ class ThreadUtilsSuite extends SparkFunSuite {
     val futureThreadName = Await.result(f, 10.seconds)
     assert(futureThreadName === callerThreadName)
   }
+
+  test("runInNewThread") {
+    import ThreadUtils._
+    assert(runInNewThread("thread-name") { Thread.currentThread().getName } 
=== "thread-name")
+    assert(runInNewThread("thread-name") { Thread.currentThread().isDaemon } 
=== true)
+    assert(
+      runInNewThread("thread-name", isDaemon = false) { 
Thread.currentThread().isDaemon } === false
+    )
+    val uniqueExceptionMessage = "test" + Random.nextInt()
+    val exception = intercept[IllegalArgumentException] {
+      runInNewThread("thread-name") { throw new 
IllegalArgumentException(uniqueExceptionMessage) }
+    }
+    assert(exception.asInstanceOf[IllegalArgumentException].getMessage === 
uniqueExceptionMessage)
+    assert(exception.getStackTrace.mkString("\n").contains(
+      "... run in separate thread using org.apache.spark.util.ThreadUtils 
...") === true,
+      "stack trace does not contain expected place holder"
+    )
+    
assert(exception.getStackTrace.mkString("\n").contains("ThreadUtils.scala") === 
false,
+      "stack trace contains unexpected references to ThreadUtils"
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d0e6e531/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b496d1f..9bdaaf8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.{ActorReceiver, 
ActorSupervisorStrategy, Receiver}
 import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, 
StreamingTab}
-import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils}
+import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils}
 
 /**
  * Main entry point for Spark Streaming functionality. It provides methods 
used to create
@@ -202,6 +202,12 @@ class StreamingContext private[streaming] (
 
   private var shutdownHookRef: AnyRef = _
 
+  // The streaming scheduler and other threads started by the StreamingContext
+  // should not inherit jobs group and job descriptions from the thread that
+  // start the context. This configuration allows jobs group and job 
description
+  // to be cleared in threads related to streaming. See SPARK-10649.
+  sparkContext.conf.set("spark.localProperties.clone", "true")
+
   conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint)
 
   /**
@@ -588,12 +594,20 @@ class StreamingContext private[streaming] (
     state match {
       case INITIALIZED =>
         startSite.set(DStream.getCreationSite())
-        sparkContext.setCallSite(startSite.get)
         StreamingContext.ACTIVATION_LOCK.synchronized {
           StreamingContext.assertNoOtherContextIsActive()
           try {
             validate()
-            scheduler.start()
+
+            // Start the streaming scheduler in a new thread, so that thread 
local properties
+            // like call sites and job groups can be reset without affecting 
those of the
+            // current thread.
+            ThreadUtils.runInNewThread("streaming-start") {
+              sparkContext.setCallSite(startSite.get)
+              sparkContext.clearJobGroup()
+              
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, 
"false")
+              scheduler.start()
+            }
             state = StreamingContextState.ACTIVE
           } catch {
             case NonFatal(e) =>
@@ -618,6 +632,7 @@ class StreamingContext private[streaming] (
     }
   }
 
+
   /**
    * Wait for the execution to stop. Any exceptions that occurs during the 
execution
    * will be thrown in this thread.

http://git-wip-us.apache.org/repos/asf/spark/blob/d0e6e531/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index d26894e..3b9d0d1 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -180,6 +180,38 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
     assert(ssc.scheduler.isStarted === false)
   }
 
+  test("start should set job group and description of streaming jobs 
correctly") {
+    ssc = new StreamingContext(conf, batchDuration)
+    ssc.sc.setJobGroup("non-streaming", "non-streaming", true)
+    val sc = ssc.sc
+
+    @volatile var jobGroupFound: String = ""
+    @volatile var jobDescFound: String = ""
+    @volatile var jobInterruptFound: String = ""
+    @volatile var allFound: Boolean = false
+
+    addInputStream(ssc).foreachRDD { rdd =>
+      jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)
+      jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
+      jobInterruptFound = 
sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
+      allFound = true
+    }
+    ssc.start()
+
+    eventually(timeout(10 seconds), interval(10 milliseconds)) {
+      assert(allFound === true)
+    }
+
+    // Verify streaming jobs have expected thread-local properties
+    assert(jobGroupFound === null)
+    assert(jobDescFound === null)
+    assert(jobInterruptFound === "false")
+
+    // Verify current thread's thread-local properties have not changed
+    assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === 
"non-streaming")
+    assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === 
"non-streaming")
+    assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === 
"true")
+  }
 
   test("start multiple times") {
     ssc = new StreamingContext(master, appName, batchDuration)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to