Repository: spark
Updated Branches:
  refs/heads/master 12e2551ea -> 655032965


[SPARK-3762] clear reference of SparkEnv after stop

SparkEnv is cached in ThreadLocal object, so after stop and create a new 
SparkContext, old SparkEnv is still used by some threads, it will trigger many 
problems, for example, pyspark will have problem after restart SparkContext, 
because py4j use thread pool for RPC.

This patch will clear all the references after stop a SparkEnv.

cc mateiz tdas pwendell

Author: Davies Liu <davies....@gmail.com>

Closes #2624 from davies/env and squashes the following commits:

a69f30c [Davies Liu] deprecate getThreadLocal
ba77ca4 [Davies Liu] remove getThreadLocal(), update docs
ee62bb7 [Davies Liu] cleanup ThreadLocal of SparnENV
4d0ea8b [Davies Liu] clear reference of SparkEnv after stop


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65503296
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65503296
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65503296

Branch: refs/heads/master
Commit: 655032965fc7e2368dff9947fc024ac720ffd19c
Parents: 12e2551
Author: Davies Liu <davies....@gmail.com>
Authored: Tue Oct 7 12:06:12 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Tue Oct 7 12:06:12 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkEnv.scala   | 19 ++++++++-----------
 .../org/apache/spark/api/python/PythonRDD.scala  |  1 -
 .../org/apache/spark/executor/Executor.scala     |  2 --
 .../scala/org/apache/spark/rdd/PipedRDD.scala    |  1 -
 .../apache/spark/scheduler/DAGScheduler.scala    |  1 -
 .../spark/scheduler/TaskSchedulerImpl.scala      |  2 --
 .../spark/streaming/scheduler/JobGenerator.scala |  1 -
 .../spark/streaming/scheduler/JobScheduler.scala |  1 -
 .../streaming/scheduler/ReceiverTracker.scala    |  1 -
 9 files changed, 8 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 72cac42..aba713c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -43,9 +43,8 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  * :: DeveloperApi ::
  * Holds all the runtime environment objects for a running Spark instance 
(either master or worker),
  * including the serializer, Akka actor system, block manager, map output 
tracker, etc. Currently
- * Spark code finds the SparkEnv through a thread-local variable, so each 
thread that accesses these
- * objects needs to have the right SparkEnv set. You can get the current 
environment with
- * SparkEnv.get (e.g. after creating a SparkContext) and set it with 
SparkEnv.set.
+ * Spark code finds the SparkEnv through a global variable, so all the threads 
can access the same
+ * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a 
SparkContext).
  *
  * NOTE: This is not intended for external use. This is exposed for Shark and 
may be made private
  *       in a future release.
@@ -119,30 +118,28 @@ class SparkEnv (
 }
 
 object SparkEnv extends Logging {
-  private val env = new ThreadLocal[SparkEnv]
-  @volatile private var lastSetSparkEnv : SparkEnv = _
+  @volatile private var env: SparkEnv = _
 
   private[spark] val driverActorSystemName = "sparkDriver"
   private[spark] val executorActorSystemName = "sparkExecutor"
 
   def set(e: SparkEnv) {
-    lastSetSparkEnv = e
-    env.set(e)
+    env = e
   }
 
   /**
-   * Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
-   * previously set in any thread.
+   * Returns the SparkEnv.
    */
   def get: SparkEnv = {
-    Option(env.get()).getOrElse(lastSetSparkEnv)
+    env
   }
 
   /**
    * Returns the ThreadLocal SparkEnv.
    */
+  @deprecated("Use SparkEnv.get instead", "1.2")
   def getThreadLocal: SparkEnv = {
-    env.get()
+    env
   }
 
   private[spark] def create(

http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 9241414..ad6eb9e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -196,7 +196,6 @@ private[spark] class PythonRDD(
 
     override def run(): Unit = Utils.logUncaughtExceptions {
       try {
-        SparkEnv.set(env)
         val stream = new BufferedOutputStream(worker.getOutputStream, 
bufferSize)
         val dataOut = new DataOutputStream(stream)
         // Partition index

http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9bbfcdc..616c7e6 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -148,7 +148,6 @@ private[spark] class Executor(
 
     override def run() {
       val startTime = System.currentTimeMillis()
-      SparkEnv.set(env)
       Thread.currentThread.setContextClassLoader(replClassLoader)
       val ser = SparkEnv.get.closureSerializer.newInstance()
       logInfo(s"Running $taskName (TID $taskId)")
@@ -158,7 +157,6 @@ private[spark] class Executor(
       val startGCTime = gcTime
 
       try {
-        SparkEnv.set(env)
         Accumulators.clear()
         val (taskFiles, taskJars, taskBytes) = 
Task.deserializeWithDependencies(serializedTask)
         updateDependencies(taskFiles, taskJars)

http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 5d77d37..56ac7a6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -131,7 +131,6 @@ private[spark] class PipedRDD[T: ClassTag](
     // Start a thread to feed the process input from our parent's iterator
     new Thread("stdin writer for " + command) {
       override def run() {
-        SparkEnv.set(env)
         val out = new PrintWriter(proc.getOutputStream)
 
         // input the pipe context firstly

http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8135cdb..788eb1f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -630,7 +630,6 @@ class DAGScheduler(
   protected def runLocallyWithinThread(job: ActiveJob) {
     var jobResult: JobResult = JobSucceeded
     try {
-      SparkEnv.set(env)
       val rdd = job.finalStage.rdd
       val split = rdd.partitions(job.partitions(0))
       val taskContext =

http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 4dc5504..6d697e3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -216,8 +216,6 @@ private[spark] class TaskSchedulerImpl(
    * that tasks are balanced across the cluster.
    */
   def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = 
synchronized {
-    SparkEnv.set(sc.env)
-
     // Mark each slave as alive and remember its hostname
     // Also track if new executor is added
     var newExecAvail = false

http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 3748483..7d73ada 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -217,7 +217,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
 
   /** Generate jobs and perform checkpoint for the given `time`.  */
   private def generateJobs(time: Time) {
-    SparkEnv.set(ssc.env)
     Try(graph.generateJobs(time)) match {
       case Success(jobs) =>
         val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>

http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 1b034b9..cfa3cd8 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -138,7 +138,6 @@ class JobScheduler(val ssc: StreamingContext) extends 
Logging {
     }
     jobSet.handleJobStart(job)
     logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
-    SparkEnv.set(ssc.env)
   }
 
   private def handleJobCompletion(job: Job) {

http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 5307fe1..7149dbc 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -202,7 +202,6 @@ class ReceiverTracker(ssc: StreamingContext) extends 
Logging {
     @transient val thread  = new Thread() {
       override def run() {
         try {
-          SparkEnv.set(env)
           startReceivers()
         } catch {
           case ie: InterruptedException => logInfo("ReceiverLauncher 
interrupted")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to