Repository: spark Updated Branches: refs/heads/master 463bac001 -> bc36df127
[SPARK-13063][YARN] Make the SPARK YARN STAGING DIR as configurable ## What changes were proposed in this pull request? Made the SPARK YARN STAGING DIR as configurable with the configuration as 'spark.yarn.staging-dir'. ## How was this patch tested? I have verified it manually by running applications on yarn, If the 'spark.yarn.staging-dir' is configured then the value used as staging directory otherwise uses the default value i.e. file systemâs home directory for the user. Author: Devaraj K <deva...@apache.org> Closes #12082 from devaraj-kavali/SPARK-13063. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc36df12 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc36df12 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc36df12 Branch: refs/heads/master Commit: bc36df127d3b9f56b4edaeb5eca7697d4aef761a Parents: 463bac0 Author: Devaraj K <deva...@apache.org> Authored: Tue Apr 5 14:12:00 2016 -0500 Committer: Tom Graves <tgra...@yahoo-inc.com> Committed: Tue Apr 5 14:12:00 2016 -0500 ---------------------------------------------------------------------- docs/running-on-yarn.md | 7 +++++++ .../org/apache/spark/deploy/yarn/Client.scala | 18 +++++++++++++++--- .../org/apache/spark/deploy/yarn/config.scala | 5 +++++ 3 files changed, 27 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bc36df12/docs/running-on-yarn.md ---------------------------------------------------------------------- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index bb83272..ddc75a7 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -160,6 +160,13 @@ If you need a reference to the proper location to put log files in the YARN so t </td> </tr> <tr> + <td><code>spark.yarn.stagingDir</code></td> + <td>Current user's home directory in the filesystem</td> + <td> + Staging directory used while submitting applications. + </td> +</tr> +<tr> <td><code>spark.yarn.preserve.staging.files</code></td> <td><code>false</code></td> <td> http://git-wip-us.apache.org/repos/asf/spark/blob/bc36df12/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 336e29f..5e7e3be 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -182,8 +182,8 @@ private[spark] class Client( val appStagingDir = getAppStagingDir(appId) try { val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) - val stagingDirPath = new Path(appStagingDir) val fs = FileSystem.get(hadoopConf) + val stagingDirPath = getAppStagingDirPath(sparkConf, fs, appStagingDir) if (!preserveFiles && fs.exists(stagingDirPath)) { logInfo("Deleting staging directory " + stagingDirPath) fs.delete(stagingDirPath, true) @@ -357,7 +357,7 @@ private[spark] class Client( // Upload Spark and the application JAR to the remote file system if necessary, // and add them as local resources to the application master. val fs = FileSystem.get(hadoopConf) - val dst = new Path(fs.getHomeDirectory(), appStagingDir) + val dst = getAppStagingDirPath(sparkConf, fs, appStagingDir) val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials) // Used to keep track of URIs added to the distributed cache. If the same URI is added @@ -668,7 +668,7 @@ private[spark] class Client( env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() if (loginFromKeytab) { val remoteFs = FileSystem.get(hadoopConf) - val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir) + val stagingDirPath = getAppStagingDirPath(sparkConf, remoteFs, stagingDir) val credentialsFile = "credentials-" + UUID.randomUUID().toString sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString) logInfo(s"Credentials file set to: $credentialsFile") @@ -1438,4 +1438,16 @@ private object Client extends Logging { uri.startsWith(s"$LOCAL_SCHEME:") } + /** + * Returns the app staging dir based on the STAGING_DIR configuration if configured + * otherwise based on the users home directory. + */ + private def getAppStagingDirPath( + conf: SparkConf, + fs: FileSystem, + appStagingDir: String): Path = { + val baseDir = conf.get(STAGING_DIR).map { new Path(_) }.getOrElse(fs.getHomeDirectory()) + new Path(baseDir, appStagingDir) + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/bc36df12/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index a3b9134..5188a3e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -108,6 +108,11 @@ package object config { .intConf .optional + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") + .doc("Staging directory used while submitting applications.") + .stringConf + .optional + /* Cluster-mode launcher configuration. */ private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org