Repository: spark
Updated Branches:
  refs/heads/master d90434c03 -> b6cf13481


[SPARK-2889] Create Hadoop config objects consistently.

Different places in the code were instantiating Configuration / 
YarnConfiguration objects in different ways. This could lead to confusion for 
people who actually expected "spark.hadoop.*" options to end up in the configs 
used by Spark code, since that would only happen for the SparkContext's config.

This change modifies most places to use SparkHadoopUtil to initialize configs, 
and make that method do the translation that previously was only done inside 
SparkContext.

The places that were not changed fall in one of the following categories:
- Test code where this doesn't really matter
- Places deep in the code where plumbing SparkConf would be too difficult for 
very little gain
- Default values for arguments - since the caller can provide their own config 
in that case

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #1843 from vanzin/SPARK-2889 and squashes the following commits:

52daf35 [Marcelo Vanzin] Merge branch 'master' into SPARK-2889
f179013 [Marcelo Vanzin] Merge branch 'master' into SPARK-2889
51e71cf [Marcelo Vanzin] Add test to ensure that overriding Yarn configs works.
53f9506 [Marcelo Vanzin] Add DeveloperApi annotation.
3d345cb [Marcelo Vanzin] Restore old method for backwards compat.
fc45067 [Marcelo Vanzin] Merge branch 'master' into SPARK-2889
0ac3fdf [Marcelo Vanzin] Merge branch 'master' into SPARK-2889
3f26760 [Marcelo Vanzin] Compilation fix.
f16cadd [Marcelo Vanzin] Initialize config in SparkHadoopUtil.
b8ab173 [Marcelo Vanzin] Update Utils API to take a Configuration argument.
1e7003f [Marcelo Vanzin] Replace explicit Configuration instantiation with 
SparkHadoopUtil.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6cf1348
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6cf1348
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6cf1348

Branch: refs/heads/master
Commit: b6cf1348170951396a6a5d8a65fb670382304f5b
Parents: d90434c
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Sat Aug 30 14:48:07 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sat Aug 30 14:48:07 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 24 ++----------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 39 ++++++++++++++++++--
 .../deploy/history/FsHistoryProvider.scala      |  4 +-
 .../org/apache/spark/deploy/master/Master.scala |  6 ++-
 .../spark/deploy/worker/DriverRunner.scala      | 11 +++---
 .../org/apache/spark/deploy/worker/Worker.scala |  4 +-
 .../org/apache/spark/executor/Executor.scala    | 14 ++++---
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  2 +-
 .../spark/scheduler/EventLoggingListener.scala  |  5 ++-
 .../cluster/SimrSchedulerBackend.scala          |  5 ++-
 .../org/apache/spark/util/FileLogger.scala      | 13 ++++++-
 .../scala/org/apache/spark/util/Utils.scala     | 20 +++++-----
 .../apache/spark/deploy/JsonProtocolSuite.scala |  4 +-
 .../spark/deploy/worker/DriverRunnerTest.scala  |  5 ++-
 .../scheduler/EventLoggingListenerSuite.scala   |  4 +-
 .../spark/scheduler/ReplayListenerSuite.scala   |  4 +-
 .../org/apache/spark/util/FileLoggerSuite.scala |  4 +-
 .../org/apache/spark/examples/SparkHdfsLR.scala |  4 +-
 .../spark/examples/SparkTachyonHdfsLR.scala     |  4 +-
 .../apache/spark/repl/ExecutorClassLoader.scala |  8 ++--
 .../spark/repl/ExecutorClassLoaderSuite.scala   | 10 ++---
 .../org/apache/spark/deploy/yarn/Client.scala   |  2 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  3 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  3 +-
 .../cluster/YarnClientClusterScheduler.scala    |  8 +---
 .../cluster/YarnClusterScheduler.scala          |  9 +----
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  | 15 +++++++-
 .../org/apache/spark/deploy/yarn/Client.scala   |  4 +-
 28 files changed, 144 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e132955..a80b3cc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -224,26 +224,7 @@ class SparkContext(config: SparkConf) extends Logging {
   ui.bind()
 
   /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) 
that we reuse. */
-  val hadoopConfiguration: Configuration = {
-    val hadoopConf = SparkHadoopUtil.get.newConfiguration()
-    // Explicitly check for S3 environment variables
-    if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
-        System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
-      hadoopConf.set("fs.s3.awsAccessKeyId", 
System.getenv("AWS_ACCESS_KEY_ID"))
-      hadoopConf.set("fs.s3n.awsAccessKeyId", 
System.getenv("AWS_ACCESS_KEY_ID"))
-      hadoopConf.set("fs.s3.awsSecretAccessKey", 
System.getenv("AWS_SECRET_ACCESS_KEY"))
-      hadoopConf.set("fs.s3n.awsSecretAccessKey", 
System.getenv("AWS_SECRET_ACCESS_KEY"))
-    }
-    // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
-    conf.getAll.foreach { case (key, value) =>
-      if (key.startsWith("spark.hadoop.")) {
-        hadoopConf.set(key.substring("spark.hadoop.".length), value)
-      }
-    }
-    val bufferSize = conf.get("spark.buffer.size", "65536")
-    hadoopConf.set("io.file.buffer.size", bufferSize)
-    hadoopConf
-  }
+  val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
 
   // Optionally log Spark events
   private[spark] val eventLogger: Option[EventLoggingListener] = {
@@ -827,7 +808,8 @@ class SparkContext(config: SparkConf) extends Logging {
     addedFiles(key) = System.currentTimeMillis
 
     // Fetch the file locally in case a job is executed using 
DAGScheduler.runLocally().
-    Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, 
env.securityManager)
+    Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, 
env.securityManager,
+      hadoopConfiguration)
 
     logInfo("Added file " + path + " at " + key + " with timestamp " + 
addedFiles(key))
     postEnvironmentUpdate()

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 148115d..fe0ad9e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -24,15 +24,18 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
 
-import org.apache.spark.{Logging, SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}
+import org.apache.spark.annotation.DeveloperApi
 
 import scala.collection.JavaConversions._
 
 /**
+ * :: DeveloperApi ::
  * Contains util methods to interact with Hadoop from Spark.
  */
+@DeveloperApi
 class SparkHadoopUtil extends Logging {
-  val conf: Configuration = newConfiguration()
+  val conf: Configuration = newConfiguration(new SparkConf())
   UserGroupInformation.setConfiguration(conf)
 
   /**
@@ -64,11 +67,39 @@ class SparkHadoopUtil extends Logging {
     }
   }
 
+  @Deprecated
+  def newConfiguration(): Configuration = newConfiguration(null)
+
   /**
    * Return an appropriate (subclass) of Configuration. Creating config can 
initializes some Hadoop
    * subsystems.
    */
-  def newConfiguration(): Configuration = new Configuration()
+  def newConfiguration(conf: SparkConf): Configuration = {
+    val hadoopConf = new Configuration()
+
+    // Note: this null check is around more than just access to the "conf" 
object to maintain
+    // the behavior of the old implementation of this code, for backwards 
compatibility.
+    if (conf != null) {
+      // Explicitly check for S3 environment variables
+      if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
+          System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
+        hadoopConf.set("fs.s3.awsAccessKeyId", 
System.getenv("AWS_ACCESS_KEY_ID"))
+        hadoopConf.set("fs.s3n.awsAccessKeyId", 
System.getenv("AWS_ACCESS_KEY_ID"))
+        hadoopConf.set("fs.s3.awsSecretAccessKey", 
System.getenv("AWS_SECRET_ACCESS_KEY"))
+        hadoopConf.set("fs.s3n.awsSecretAccessKey", 
System.getenv("AWS_SECRET_ACCESS_KEY"))
+      }
+      // Copy any "spark.hadoop.foo=bar" system properties into conf as 
"foo=bar"
+      conf.getAll.foreach { case (key, value) =>
+        if (key.startsWith("spark.hadoop.")) {
+          hadoopConf.set(key.substring("spark.hadoop.".length), value)
+        }
+      }
+      val bufferSize = conf.get("spark.buffer.size", "65536")
+      hadoopConf.set("io.file.buffer.size", bufferSize)
+    }
+
+    hadoopConf
+  }
 
   /**
    * Add any user credentials to the job conf which are necessary for running 
on a secure Hadoop
@@ -86,7 +117,7 @@ class SparkHadoopUtil extends Logging {
 
   def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
 
-  def loginUserFromKeytab(principalName: String, keytabFilename: String) { 
+  def loginUserFromKeytab(principalName: String, keytabFilename: String) {
     UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index cc06540..05c8a90 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable
 import org.apache.hadoop.fs.{FileStatus, Path}
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.Utils
@@ -40,7 +41,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
     .map { d => Utils.resolveURI(d) }
     .getOrElse { throw new IllegalArgumentException("Logging directory must be 
specified.") }
 
-  private val fs = Utils.getHadoopFileSystem(resolvedLogDir)
+  private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
+    SparkHadoopUtil.get.newConfiguration(conf))
 
   // A timestamp of when the disk was last accessed to check for log updates
   private var lastLogCheckTimeMs = -1L

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 5017273..2a66fcf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -33,7 +33,8 @@ import akka.remote.{DisassociatedEvent, 
RemotingLifecycleEvent}
 import akka.serialization.SerializationExtension
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, 
ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, 
ExecutorState,
+  SparkHadoopUtil}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.master.DriverState.DriverState
@@ -673,7 +674,8 @@ private[spark] class Master(
       app.desc.appUiUrl = notFoundBasePath
       return false
     }
-    val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
+    val fileSystem = Utils.getHadoopFileSystem(eventLogDir,
+      SparkHadoopUtil.get.newConfiguration(conf))
     val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, 
fileSystem)
     val eventLogPaths = eventLogInfo.logPaths
     val compressionCodec = eventLogInfo.compressionCodec

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 5caaf6b..9f99117 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -28,8 +28,8 @@ import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileUtil, Path}
 
-import org.apache.spark.Logging
-import org.apache.spark.deploy.{Command, DriverDescription}
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil}
 import org.apache.spark.deploy.DeployMessages.DriverStateChanged
 import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.deploy.master.DriverState.DriverState
@@ -39,6 +39,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
  * This is currently only used in standalone cluster deploy mode.
  */
 private[spark] class DriverRunner(
+    val conf: SparkConf,
     val driverId: String,
     val workDir: File,
     val sparkHome: File,
@@ -144,8 +145,8 @@ private[spark] class DriverRunner(
 
     val jarPath = new Path(driverDesc.jarUrl)
 
-    val emptyConf = new Configuration()
-    val jarFileSystem = jarPath.getFileSystem(emptyConf)
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+    val jarFileSystem = jarPath.getFileSystem(hadoopConf)
 
     val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
     val jarFileName = jarPath.getName
@@ -154,7 +155,7 @@ private[spark] class DriverRunner(
 
     if (!localJarFile.exists()) { // May already exist if running multiple 
workers on one node
       logInfo(s"Copying user jar $jarPath to $destPath")
-      FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf)
+      FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf)
     }
 
     if (!localJarFile.exists()) { // Verify copy succeeded

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 81400af..e475567 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -257,7 +257,7 @@ private[spark] class Worker(
       val fullId = appId + "/" + execId
       if (ExecutorState.isFinished(state)) {
         executors.get(fullId) match {
-          case Some(executor) => 
+          case Some(executor) =>
             logInfo("Executor " + fullId + " finished with state " + state +
               message.map(" message " + _).getOrElse("") +
               exitStatus.map(" exitStatus " + _).getOrElse(""))
@@ -288,7 +288,7 @@ private[spark] class Worker(
 
     case LaunchDriver(driverId, driverDesc) => {
       logInfo(s"Asked to launch driver $driverId")
-      val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, 
self, akkaUrl)
+      val driver = new DriverRunner(conf, driverId, workDir, sparkHome, 
driverDesc, self, akkaUrl)
       drivers(driverId) = driver
       driver.start()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2f76e53..d7d19f6 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,6 +26,7 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
 import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler._
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
@@ -294,9 +295,9 @@ private[spark] class Executor(
       try {
         val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
           .asInstanceOf[Class[_ <: ClassLoader]]
-        val constructor = klass.getConstructor(classOf[String], 
classOf[ClassLoader],
-          classOf[Boolean])
-        constructor.newInstance(classUri, parent, userClassPathFirst)
+        val constructor = klass.getConstructor(classOf[SparkConf], 
classOf[String],
+          classOf[ClassLoader], classOf[Boolean])
+        constructor.newInstance(conf, classUri, parent, userClassPathFirst)
       } catch {
         case _: ClassNotFoundException =>
           logError("Could not find org.apache.spark.repl.ExecutorClassLoader 
on classpath!")
@@ -313,16 +314,19 @@ private[spark] class Executor(
    * SparkContext. Also adds any new JARs we fetched to the class loader.
    */
   private def updateDependencies(newFiles: HashMap[String, Long], newJars: 
HashMap[String, Long]) {
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
     synchronized {
       // Fetch missing dependencies
       for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) 
< timestamp) {
         logInfo("Fetching " + name + " with timestamp " + timestamp)
-        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, 
env.securityManager)
+        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, 
env.securityManager,
+          hadoopConf)
         currentFiles(name) = timestamp
       }
       for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < 
timestamp) {
         logInfo("Fetching " + name + " with timestamp " + timestamp)
-        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, 
env.securityManager)
+        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, 
env.securityManager,
+          hadoopConf)
         currentJars(name) = timestamp
         // Add it to our class loader
         val localName = name.split("/").last

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 2093878..7ba1182 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -157,7 +157,7 @@ private[spark] object CheckpointRDD extends Logging {
     val sc = new SparkContext(cluster, "CheckpointRDD Test")
     val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
     val path = new Path(hdfsPath, "temp")
-    val conf = SparkHadoopUtil.get.newConfiguration()
+    val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
     val fs = path.getFileSystem(conf)
     val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
     sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, 
broadcastedConf, 1024) _)

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 370fcd8..4b99f63 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -44,11 +44,14 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, 
Utils}
 private[spark] class EventLoggingListener(
     appName: String,
     sparkConf: SparkConf,
-    hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration())
+    hadoopConf: Configuration)
   extends SparkListener with Logging {
 
   import EventLoggingListener._
 
+  def this(appName: String, sparkConf: SparkConf) =
+    this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
+
   private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", 
false)
   private val shouldOverwrite = 
sparkConf.getBoolean("spark.eventLog.overwrite", false)
   private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 4f7133c..bc7670f 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, FileSystem}
 
 import org.apache.spark.{Logging, SparkContext, SparkEnv}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
 private[spark] class SimrSchedulerBackend(
@@ -44,7 +45,7 @@ private[spark] class SimrSchedulerBackend(
       sc.conf.get("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
-    val conf = new Configuration()
+    val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
     val fs = FileSystem.get(conf)
 
     logInfo("Writing to HDFS file: "  + driverFilePath)
@@ -63,7 +64,7 @@ private[spark] class SimrSchedulerBackend(
   }
 
   override def stop() {
-    val conf = new Configuration()
+    val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
     val fs = FileSystem.get(conf)
     fs.delete(new Path(driverFilePath), false)
     super.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/main/scala/org/apache/spark/util/FileLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala 
b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index ad8b79a..6d1fc05 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -41,13 +41,22 @@ import org.apache.spark.io.CompressionCodec
 private[spark] class FileLogger(
     logDir: String,
     sparkConf: SparkConf,
-    hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(),
+    hadoopConf: Configuration,
     outputBufferSize: Int = 8 * 1024, // 8 KB
     compress: Boolean = false,
     overwrite: Boolean = true,
     dirPermissions: Option[FsPermission] = None)
   extends Logging {
 
+  def this(
+      logDir: String,
+      sparkConf: SparkConf,
+      compress: Boolean = false,
+      overwrite: Boolean = true) = {
+    this(logDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf), 
compress = compress,
+      overwrite = overwrite)
+  }
+
   private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
     override def initialValue(): SimpleDateFormat = new 
SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
   }
@@ -57,7 +66,7 @@ private[spark] class FileLogger(
    * create unique FileSystem instance only for FileLogger
    */
   private val fileSystem = {
-    val conf = SparkHadoopUtil.get.newConfiguration()
+    val conf = SparkHadoopUtil.get.newConfiguration(sparkConf)
     val logUri = new URI(logDir)
     val scheme = logUri.getScheme
     if (scheme == "hdfs") {

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 86f646d..0ae28f9 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -34,6 +34,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.commons.lang3.SystemUtils
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
 import org.json4s._
 import tachyon.client.{TachyonFile,TachyonFS}
@@ -318,7 +319,8 @@ private[spark] object Utils extends Logging {
    * Throws SparkException if the target file already exists and has different 
contents than
    * the requested file.
    */
-  def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: 
SecurityManager) {
+  def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: 
SecurityManager,
+    hadoopConf: Configuration) {
     val filename = url.split("/").last
     val tempDir = getLocalDir(conf)
     val tempFile =  File.createTempFile("fetchFileTemp", null, new 
File(tempDir))
@@ -390,7 +392,7 @@ private[spark] object Utils extends Logging {
         }
       case _ =>
         // Use the Hadoop filesystem library, which supports file://, hdfs://, 
s3://, and others
-        val fs = getHadoopFileSystem(uri)
+        val fs = getHadoopFileSystem(uri, hadoopConf)
         val in = fs.open(new Path(uri))
         val out = new FileOutputStream(tempFile)
         Utils.copyStream(in, out, true)
@@ -862,8 +864,8 @@ private[spark] object Utils extends Logging {
    */
   def getCallSite: CallSite = {
     val trace = Thread.currentThread.getStackTrace()
-      .filterNot { ste:StackTraceElement => 
-        // When running under some profilers, the current stack trace might 
contain some bogus 
+      .filterNot { ste:StackTraceElement =>
+        // When running under some profilers, the current stack trace might 
contain some bogus
         // frames. This is intended to ensure that we don't crash in these 
situations by
         // ignoring any frames that we can't examine.
         (ste == null || ste.getMethodName == null || 
ste.getMethodName.contains("getStackTrace"))
@@ -1179,15 +1181,15 @@ private[spark] object Utils extends Logging {
   /**
    * Return a Hadoop FileSystem with the scheme encoded in the given path.
    */
-  def getHadoopFileSystem(path: URI): FileSystem = {
-    FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
+  def getHadoopFileSystem(path: URI, conf: Configuration): FileSystem = {
+    FileSystem.get(path, conf)
   }
 
   /**
    * Return a Hadoop FileSystem with the scheme encoded in the given path.
    */
-  def getHadoopFileSystem(path: String): FileSystem = {
-    getHadoopFileSystem(new URI(path))
+  def getHadoopFileSystem(path: String, conf: Configuration): FileSystem = {
+    getHadoopFileSystem(new URI(path), conf)
   }
 
   /**
@@ -1264,7 +1266,7 @@ private[spark] object Utils extends Logging {
     }
   }
 
-  /** 
+  /**
    * Execute the given block, logging and re-throwing any uncaught exception.
    * This is particularly useful for wrapping code that runs in a thread, to 
ensure
    * that exceptions are printed, and to avoid having to catch Throwable.

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 31aa7ec..2a58c6a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -121,8 +121,8 @@ class JsonProtocolSuite extends FunSuite {
       new SparkConf, ExecutorState.RUNNING)
   }
   def createDriverRunner(): DriverRunner = {
-    new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), 
createDriverDesc(),
-      null, "akka://worker")
+    new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new 
File("sparkHome"),
+      createDriverDesc(), null, "akka://worker")
   }
 
   def assertValidJson(json: JValue) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index c930839..b6f4411 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -25,14 +25,15 @@ import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.FunSuite
 
+import org.apache.spark.SparkConf
 import org.apache.spark.deploy.{Command, DriverDescription}
 
 class DriverRunnerTest extends FunSuite {
   private def createDriverRunner() = {
     val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
     val driverDescription = new DriverDescription("jarUrl", 512, 1, true, 
command)
-    new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), 
driverDescription,
-      null, "akka://1.2.3.4/worker/")
+    new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new 
File("sparkHome"),
+      driverDescription, null, "akka://1.2.3.4/worker/")
   }
 
   private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 10d8b29..41e58a0 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.{JsonProtocol, Utils}
 
@@ -39,7 +40,8 @@ import java.io.File
  * read and deserialized into actual SparkListenerEvents.
  */
 class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
-  private val fileSystem = Utils.getHadoopFileSystem("/")
+  private val fileSystem = Utils.getHadoopFileSystem("/",
+    SparkHadoopUtil.get.newConfiguration(new SparkConf()))
   private val allCompressionCodecs = Seq[String](
     "org.apache.spark.io.LZFCompressionCodec",
     "org.apache.spark.io.SnappyCompressionCodec"

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 6b6e010..8f0ee9f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark.SparkContext._
 import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.{JsonProtocol, Utils}
 
@@ -32,7 +33,8 @@ import org.apache.spark.util.{JsonProtocol, Utils}
  * Test whether ReplayListenerBus replays events from logs correctly.
  */
 class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
-  private val fileSystem = Utils.getHadoopFileSystem("/")
+  private val fileSystem = Utils.getHadoopFileSystem("/",
+    SparkHadoopUtil.get.newConfiguration(new SparkConf()))
   private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
   private var testDir: File = _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala 
b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
index 44332fc..c3dd156 100644
--- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
@@ -26,13 +26,15 @@ import org.apache.hadoop.fs.Path
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
 
 /**
  * Test writing files through the FileLogger.
  */
 class FileLoggerSuite extends FunSuite with BeforeAndAfter {
-  private val fileSystem = Utils.getHadoopFileSystem("/")
+  private val fileSystem = Utils.getHadoopFileSystem("/",
+    SparkHadoopUtil.get.newConfiguration(new SparkConf()))
   private val allCompressionCodecs = Seq[String](
     "org.apache.spark.io.LZFCompressionCodec",
     "org.apache.spark.io.SnappyCompressionCodec"

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala 
b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index d583cf4..3258510 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -22,9 +22,9 @@ import java.util.Random
 import scala.math.exp
 
 import breeze.linalg.{Vector, DenseVector}
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler.InputFormatInfo
 
 
@@ -70,7 +70,7 @@ object SparkHdfsLR {
 
     val sparkConf = new SparkConf().setAppName("SparkHdfsLR")
     val inputPath = args(0)
-    val conf = SparkHadoopUtil.get.newConfiguration()
+    val conf = new Configuration()
     val sc = new SparkContext(sparkConf,
       InputFormatInfo.computePreferredLocations(
         Seq(new InputFormatInfo(conf, 
classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala 
b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
index 2212762..96d1361 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
@@ -22,9 +22,9 @@ import java.util.Random
 import scala.math.exp
 
 import breeze.linalg.{Vector, DenseVector}
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler.InputFormatInfo
 import org.apache.spark.storage.StorageLevel
 
@@ -52,8 +52,8 @@ object SparkTachyonHdfsLR {
 
   def main(args: Array[String]) {
     val inputPath = args(0)
-    val conf = SparkHadoopUtil.get.newConfiguration()
     val sparkConf = new SparkConf().setAppName("SparkTachyonHdfsLR")
+    val conf = new Configuration()
     val sc = new SparkContext(sparkConf,
       InputFormatInfo.computePreferredLocations(
         Seq(new InputFormatInfo(conf, 
classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
----------------------------------------------------------------------
diff --git 
a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala 
b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 687e85c..5ee3250 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -21,10 +21,10 @@ import java.io.{ByteArrayOutputStream, InputStream}
 import java.net.{URI, URL, URLEncoder}
 import java.util.concurrent.{Executors, ExecutorService}
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
-import org.apache.spark.SparkEnv
+import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 import org.apache.spark.util.ParentClassLoader
 
@@ -36,7 +36,7 @@ import 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
  * used to load classes defined by the interpreter when the REPL is used.
  * Allows the user to specify if user class path should be first
  */
-class ExecutorClassLoader(classUri: String, parent: ClassLoader,
+class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: 
ClassLoader,
     userClassPathFirst: Boolean) extends ClassLoader {
   val uri = new URI(classUri)
   val directory = uri.getPath
@@ -48,7 +48,7 @@ class ExecutorClassLoader(classUri: String, parent: 
ClassLoader,
     if (uri.getScheme() == "http") {
       null
     } else {
-      FileSystem.get(uri, new Configuration())
+      FileSystem.get(uri, SparkHadoopUtil.get.newConfiguration(conf))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
----------------------------------------------------------------------
diff --git 
a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala 
b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index c0af7ce..3e2ee75 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.FunSuite
 
 import com.google.common.io.Files
 
-import org.apache.spark.TestUtils
+import org.apache.spark.{SparkConf, TestUtils}
 import org.apache.spark.util.Utils
 
 class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
@@ -57,7 +57,7 @@ class ExecutorClassLoaderSuite extends FunSuite with 
BeforeAndAfterAll {
 
   test("child first") {
     val parentLoader = new URLClassLoader(urls2, null)
-    val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
+    val classLoader = new ExecutorClassLoader(new SparkConf(), url1, 
parentLoader, true)
     val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
     val fakeClassVersion = fakeClass.toString
     assert(fakeClassVersion === "1")
@@ -65,7 +65,7 @@ class ExecutorClassLoaderSuite extends FunSuite with 
BeforeAndAfterAll {
 
   test("parent first") {
     val parentLoader = new URLClassLoader(urls2, null)
-    val classLoader = new ExecutorClassLoader(url1, parentLoader, false)
+    val classLoader = new ExecutorClassLoader(new SparkConf(), url1, 
parentLoader, false)
     val fakeClass = classLoader.loadClass("ReplFakeClass1").newInstance()
     val fakeClassVersion = fakeClass.toString
     assert(fakeClassVersion === "2")
@@ -73,7 +73,7 @@ class ExecutorClassLoaderSuite extends FunSuite with 
BeforeAndAfterAll {
 
   test("child first can fall back") {
     val parentLoader = new URLClassLoader(urls2, null)
-    val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
+    val classLoader = new ExecutorClassLoader(new SparkConf(), url1, 
parentLoader, true)
     val fakeClass = classLoader.loadClass("ReplFakeClass3").newInstance()
     val fakeClassVersion = fakeClass.toString
     assert(fakeClassVersion === "2")
@@ -81,7 +81,7 @@ class ExecutorClassLoaderSuite extends FunSuite with 
BeforeAndAfterAll {
 
   test("child first can fail") {
     val parentLoader = new URLClassLoader(urls2, null)
-    val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
+    val classLoader = new ExecutorClassLoader(new SparkConf(), url1, 
parentLoader, true)
     intercept[java.lang.ClassNotFoundException] {
       classLoader.loadClass("ReplFakeClassDoesNotExist").newInstance()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 9be7854..12f1cd3 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -40,7 +40,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
   extends YarnClientImpl with ClientBase with Logging {
 
   def this(clientArgs: ClientArguments, spConf: SparkConf) =
-    this(clientArgs, new Configuration(), spConf)
+    this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
 
   def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 5f66a98..8c54840 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -48,7 +48,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
   // optimal as more containers are available. Might need to handle this 
better.
 
   private val sparkConf = new SparkConf()
-  private val yarnConf: YarnConfiguration = new YarnConfiguration(new 
Configuration())
+  private val yarnConf: YarnConfiguration = 
SparkHadoopUtil.get.newConfiguration(sparkConf)
+    .asInstanceOf[YarnConfiguration]
   private val isDriver = args.userClass != null
 
   // Default to numExecutors * 2, with minimum of 3

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 2aa27a1..ffe2731 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -54,7 +54,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
 
   // Return an appropriate (subclass) of Configuration. Creating config can 
initializes some hadoop subsystems
   // Always create a new config, dont reuse yarnConf.
-  override def newConfiguration(): Configuration = new YarnConfiguration(new 
Configuration())
+  override def newConfiguration(conf: SparkConf): Configuration =
+    new YarnConfiguration(super.newConfiguration(conf))
 
   // add any user credentials to the job conf which are necessary for running 
on a secure Hadoop cluster
   override def addCredentials(conf: JobConf) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index d162b4c..254774a 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.scheduler.cluster
 
 import org.apache.spark._
-import org.apache.hadoop.conf.Configuration
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.Utils
@@ -26,14 +25,11 @@ import org.apache.spark.util.Utils
 /**
  * This scheduler launches executors through Yarn - by calling into Client to 
launch the Spark AM.
  */
-private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: 
Configuration)
-  extends TaskSchedulerImpl(sc) {
-
-  def this(sc: SparkContext) = this(sc, new Configuration())
+private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends 
TaskSchedulerImpl(sc) {
 
   // By default, rack is unknown
   override def getRackForHost(hostPort: String): Option[String] = {
     val host = Utils.parseHostPort(hostPort)._1
-    Option(YarnSparkHadoopUtil.lookupRack(conf, host))
+    Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 69f4022..4157ff9 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -21,19 +21,15 @@ import org.apache.spark._
 import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.Utils
-import org.apache.hadoop.conf.Configuration
 
 /**
  * This is a simple extension to ClusterScheduler - to ensure that appropriate 
initialization of
  * ApplicationMaster, etc is done
  */
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: 
Configuration)
-  extends TaskSchedulerImpl(sc) {
+private[spark] class YarnClusterScheduler(sc: SparkContext) extends 
TaskSchedulerImpl(sc) {
 
   logInfo("Created YarnClusterScheduler")
 
-  def this(sc: SparkContext) = this(sc, new Configuration())
-
   // Nothing else for now ... initialize application master : which needs a 
SparkContext to
   // determine how to allocate.
   // Note that only the first creation of a SparkContext influences (and 
ideally, there must be
@@ -43,8 +39,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, 
conf: Configuration)
   // By default, rack is unknown
   override def getRackForHost(hostPort: String): Option[String] = {
     val host = Utils.parseHostPort(hostPort)._1
-    val retval = YarnSparkHadoopUtil.lookupRack(conf, host)
-    if (retval != null) Some(retval) else None
+    Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
   }
 
   override def postStartHook() {

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 7650bd4..75db8ee 100644
--- 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -20,9 +20,10 @@ package org.apache.spark.deploy.yarn
 import java.io.{File, IOException}
 
 import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.scalatest.{FunSuite, Matchers}
 
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
 
 class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
 
@@ -61,4 +62,16 @@ class YarnSparkHadoopUtilSuite extends FunSuite with 
Matchers with Logging {
     }
   }
 
+  test("Yarn configuration override") {
+    val key = "yarn.nodemanager.hostname"
+    val default = new YarnConfiguration()
+
+    val sparkConf = new SparkConf()
+      .set("spark.hadoop." + key, "someHostName")
+    val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf)
+
+    yarnConf.getClass() should be (classOf[YarnConfiguration])
+    yarnConf.get(key) should not be default.get(key)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b6cf1348/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1f9a4bf..313a0d2 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{Logging, SparkConf}
-
+import org.apache.spark.deploy.SparkHadoopUtil
 
 /**
  * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's 
stable API.
@@ -40,7 +40,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
   val yarnClient = YarnClient.createYarnClient
 
   def this(clientArgs: ClientArguments, spConf: SparkConf) =
-    this(clientArgs, new Configuration(), spConf)
+    this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
 
   def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to