Repository: spark Updated Branches: refs/heads/master 12e2551ea -> 655032965
[SPARK-3762] clear reference of SparkEnv after stop SparkEnv is cached in ThreadLocal object, so after stop and create a new SparkContext, old SparkEnv is still used by some threads, it will trigger many problems, for example, pyspark will have problem after restart SparkContext, because py4j use thread pool for RPC. This patch will clear all the references after stop a SparkEnv. cc mateiz tdas pwendell Author: Davies Liu <davies....@gmail.com> Closes #2624 from davies/env and squashes the following commits: a69f30c [Davies Liu] deprecate getThreadLocal ba77ca4 [Davies Liu] remove getThreadLocal(), update docs ee62bb7 [Davies Liu] cleanup ThreadLocal of SparnENV 4d0ea8b [Davies Liu] clear reference of SparkEnv after stop Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65503296 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65503296 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65503296 Branch: refs/heads/master Commit: 655032965fc7e2368dff9947fc024ac720ffd19c Parents: 12e2551 Author: Davies Liu <davies....@gmail.com> Authored: Tue Oct 7 12:06:12 2014 -0700 Committer: Matei Zaharia <ma...@databricks.com> Committed: Tue Oct 7 12:06:12 2014 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkEnv.scala | 19 ++++++++----------- .../org/apache/spark/api/python/PythonRDD.scala | 1 - .../org/apache/spark/executor/Executor.scala | 2 -- .../scala/org/apache/spark/rdd/PipedRDD.scala | 1 - .../apache/spark/scheduler/DAGScheduler.scala | 1 - .../spark/scheduler/TaskSchedulerImpl.scala | 2 -- .../spark/streaming/scheduler/JobGenerator.scala | 1 - .../spark/streaming/scheduler/JobScheduler.scala | 1 - .../streaming/scheduler/ReceiverTracker.scala | 1 - 9 files changed, 8 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 72cac42..aba713c 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -43,9 +43,8 @@ import org.apache.spark.util.{AkkaUtils, Utils} * :: DeveloperApi :: * Holds all the runtime environment objects for a running Spark instance (either master or worker), * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently - * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these - * objects needs to have the right SparkEnv set. You can get the current environment with - * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set. + * Spark code finds the SparkEnv through a global variable, so all the threads can access the same + * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext). * * NOTE: This is not intended for external use. This is exposed for Shark and may be made private * in a future release. @@ -119,30 +118,28 @@ class SparkEnv ( } object SparkEnv extends Logging { - private val env = new ThreadLocal[SparkEnv] - @volatile private var lastSetSparkEnv : SparkEnv = _ + @volatile private var env: SparkEnv = _ private[spark] val driverActorSystemName = "sparkDriver" private[spark] val executorActorSystemName = "sparkExecutor" def set(e: SparkEnv) { - lastSetSparkEnv = e - env.set(e) + env = e } /** - * Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv - * previously set in any thread. + * Returns the SparkEnv. */ def get: SparkEnv = { - Option(env.get()).getOrElse(lastSetSparkEnv) + env } /** * Returns the ThreadLocal SparkEnv. */ + @deprecated("Use SparkEnv.get instead", "1.2") def getThreadLocal: SparkEnv = { - env.get() + env } private[spark] def create( http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 9241414..ad6eb9e 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -196,7 +196,6 @@ private[spark] class PythonRDD( override def run(): Unit = Utils.logUncaughtExceptions { try { - SparkEnv.set(env) val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize) val dataOut = new DataOutputStream(stream) // Partition index http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 9bbfcdc..616c7e6 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -148,7 +148,6 @@ private[spark] class Executor( override def run() { val startTime = System.currentTimeMillis() - SparkEnv.set(env) Thread.currentThread.setContextClassLoader(replClassLoader) val ser = SparkEnv.get.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") @@ -158,7 +157,6 @@ private[spark] class Executor( val startGCTime = gcTime try { - SparkEnv.set(env) Accumulators.clear() val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 5d77d37..56ac7a6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -131,7 +131,6 @@ private[spark] class PipedRDD[T: ClassTag]( // Start a thread to feed the process input from our parent's iterator new Thread("stdin writer for " + command) { override def run() { - SparkEnv.set(env) val out = new PrintWriter(proc.getOutputStream) // input the pipe context firstly http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8135cdb..788eb1f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -630,7 +630,6 @@ class DAGScheduler( protected def runLocallyWithinThread(job: ActiveJob) { var jobResult: JobResult = JobSucceeded try { - SparkEnv.set(env) val rdd = job.finalStage.rdd val split = rdd.partitions(job.partitions(0)) val taskContext = http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 4dc5504..6d697e3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -216,8 +216,6 @@ private[spark] class TaskSchedulerImpl( * that tasks are balanced across the cluster. */ def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { - SparkEnv.set(sc.env) - // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 3748483..7d73ada 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -217,7 +217,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { /** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { - SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { case Success(jobs) => val receivedBlockInfo = graph.getReceiverInputStreams.map { stream => http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 1b034b9..cfa3cd8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -138,7 +138,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } jobSet.handleJobStart(job) logInfo("Starting job " + job.id + " from job set of time " + jobSet.time) - SparkEnv.set(ssc.env) } private def handleJobCompletion(job: Job) { http://git-wip-us.apache.org/repos/asf/spark/blob/65503296/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 5307fe1..7149dbc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -202,7 +202,6 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { @transient val thread = new Thread() { override def run() { try { - SparkEnv.set(env) startReceivers() } catch { case ie: InterruptedException => logInfo("ReceiverLauncher interrupted") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org