Repository: spark
Updated Branches:
  refs/heads/master 7bf9b1201 -> c3e23bc0c


[SPARK-10653][CORE] Remove unnecessary things from SparkEnv

## What changes were proposed in this pull request?

Removed blockTransferService and sparkFilesDir from SparkEnv since they're 
rarely used and don't need to be in stored in the env. Edited their few usages 
to accommodate the change.

## How was this patch tested?

ran dev/run-tests locally

Author: Alex Bozarth <[email protected]>

Closes #12970 from ajbozarth/spark10653.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3e23bc0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3e23bc0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3e23bc0

Branch: refs/heads/master
Commit: c3e23bc0c3e87546d0575c3c4c45a2b0e2dfec6a
Parents: 7bf9b12
Author: Alex Bozarth <[email protected]>
Authored: Mon May 9 11:51:37 2016 -0700
Committer: Andrew Or <[email protected]>
Committed: Mon May 9 11:51:37 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkEnv.scala  | 26 ++++----------------
 .../scala/org/apache/spark/SparkFiles.scala     |  2 +-
 .../org/apache/spark/storage/BlockManager.scala |  2 +-
 .../org/apache/spark/DistributedSuite.scala     |  2 +-
 project/MimaExcludes.scala                      |  4 +++
 5 files changed, 12 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c3e23bc0/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 4bf8890..af50a6d 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -31,7 +31,6 @@ import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, 
UnifiedMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.BlockTransferService
 import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
@@ -61,10 +60,8 @@ class SparkEnv (
     val mapOutputTracker: MapOutputTracker,
     val shuffleManager: ShuffleManager,
     val broadcastManager: BroadcastManager,
-    val blockTransferService: BlockTransferService,
     val blockManager: BlockManager,
     val securityManager: SecurityManager,
-    val sparkFilesDir: String,
     val metricsSystem: MetricsSystem,
     val memoryManager: MemoryManager,
     val outputCommitCoordinator: OutputCommitCoordinator,
@@ -77,7 +74,7 @@ class SparkEnv (
   // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
   private[spark] val hadoopJobMetadata = new 
MapMaker().softValues().makeMap[String, Any]()
 
-  private var driverTmpDirToDelete: Option[String] = None
+  private[spark] var driverTmpDir: Option[String] = None
 
   private[spark] def stop() {
 
@@ -94,13 +91,10 @@ class SparkEnv (
       rpcEnv.shutdown()
       rpcEnv.awaitTermination()
 
-      // Note that blockTransferService is stopped by BlockManager since it is 
started by it.
-
       // If we only stop sc, but the driver process still run as a services 
then we need to delete
       // the tmp dir, if not, it will create too many tmp dirs.
-      // We only need to delete the tmp dir create by driver, because 
sparkFilesDir is point to the
-      // current working dir in executor which we do not need to delete.
-      driverTmpDirToDelete match {
+      // We only need to delete the tmp dir create by driver
+      driverTmpDir match {
         case Some(path) =>
           try {
             Utils.deleteRecursively(new File(path))
@@ -342,15 +336,6 @@ object SparkEnv extends Logging {
       ms
     }
 
-    // Set the sparkFiles directory, used when downloading dependencies.  In 
local mode,
-    // this is a temporary directory; in distributed mode, this is the 
executor's current working
-    // directory.
-    val sparkFilesDir: String = if (isDriver) {
-      Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
-    } else {
-      "."
-    }
-
     val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
       new OutputCommitCoordinator(conf, isDriver)
     }
@@ -367,10 +352,8 @@ object SparkEnv extends Logging {
       mapOutputTracker,
       shuffleManager,
       broadcastManager,
-      blockTransferService,
       blockManager,
       securityManager,
-      sparkFilesDir,
       metricsSystem,
       memoryManager,
       outputCommitCoordinator,
@@ -380,7 +363,8 @@ object SparkEnv extends Logging {
     // called, and we only need to do it for driver. Because driver may run as 
a service, and if we
     // don't delete this tmp dir when sc is stopped, then will create too many 
tmp dirs.
     if (isDriver) {
-      envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
+      val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), 
"userFiles").getAbsolutePath
+      envInstance.driverTmpDir = Some(sparkFilesDir)
     }
 
     envInstance

http://git-wip-us.apache.org/repos/asf/spark/blob/c3e23bc0/core/src/main/scala/org/apache/spark/SparkFiles.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkFiles.scala 
b/core/src/main/scala/org/apache/spark/SparkFiles.scala
index e85b89f..44f4444 100644
--- a/core/src/main/scala/org/apache/spark/SparkFiles.scala
+++ b/core/src/main/scala/org/apache/spark/SparkFiles.scala
@@ -34,6 +34,6 @@ object SparkFiles {
    * Get the root directory that contains files added through 
`SparkContext.addFile()`.
    */
   def getRootDirectory(): String =
-    SparkEnv.get.sparkFilesDir
+    SparkEnv.get.driverTmpDir.getOrElse(".")
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c3e23bc0/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f2d06c7..c56e451 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -65,7 +65,7 @@ private[spark] class BlockManager(
     memoryManager: MemoryManager,
     mapOutputTracker: MapOutputTracker,
     shuffleManager: ShuffleManager,
-    blockTransferService: BlockTransferService,
+    val blockTransferService: BlockTransferService,
     securityManager: SecurityManager,
     numUsableCores: Int)
   extends BlockDataManager with BlockEvictionHandler with Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/c3e23bc0/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index a0086e1..0be25e9 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -196,7 +196,7 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, 
index)).toArray
     val blockId = blockIds(0)
     val blockManager = SparkEnv.get.blockManager
-    val blockTransfer = SparkEnv.get.blockTransferService
+    val blockTransfer = blockManager.blockTransferService
     val serializerManager = SparkEnv.get.serializerManager
     blockManager.master.getLocations(blockId).foreach { cmId =>
       val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, 
cmId.executorId,

http://git-wip-us.apache.org/repos/asf/spark/blob/c3e23bc0/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 33e0db6..a5d57e1 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -694,6 +694,10 @@ object MimaExcludes {
         
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionModel.weights"),
         
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.regression.LinearRegressionModel.weights")
       ) ++ Seq(
+        // [SPARK-10653] [Core] Remove unnecessary things from SparkEnv
+        
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.sparkFilesDir"),
+        
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.blockTransferService")
+      ) ++ Seq(
         // SPARK-14654: New accumulator API
         
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ExceptionFailure$"),
         
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ExceptionFailure.apply"),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to