Updated Branches: refs/heads/branch-0.8 32a0c4f61 -> 1d52b5081
Merge pull request #149 from tgravescs/fixSecureHdfsAccess Fix secure hdfs access for spark on yarn https://github.com/apache/incubator-spark/pull/23 broke secure hdfs access. Not sure if it works with secure hdfs on standalone. Fixing it at least for spark on yarn. The broadcasting of jobconf change also broke secure hdfs access as it didn't take into account things calling the getPartitions before sparkContext is initialized. The DAGScheduler does this as it tries to getShuffleMapStage. (cherry picked from commit 8af99f2356ed19fe43d722ada02f8802cbd46d40) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1d52b508 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1d52b508 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1d52b508 Branch: refs/heads/branch-0.8 Commit: 1d52b5081171bad15527bc882509a874e865260c Parents: 32a0c4f Author: Matei Zaharia <[email protected]> Authored: Sat Nov 9 13:48:00 2013 -0800 Committer: Reynold Xin <[email protected]> Committed: Sat Nov 9 15:47:40 2013 -0800 ---------------------------------------------------------------------- .../apache/spark/deploy/SparkHadoopUtil.scala | 21 +++++++++++++++----- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 2 ++ 2 files changed, 18 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1d52b508/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index c29a301..fc1537f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.SparkException +import org.apache.spark.{SparkContext, SparkException} /** * Contains util methods to interact with Hadoop from Spark. @@ -34,10 +34,21 @@ class SparkHadoopUtil { UserGroupInformation.setConfiguration(conf) def runAsUser(user: String)(func: () => Unit) { - val ugi = UserGroupInformation.createRemoteUser(user) - ugi.doAs(new PrivilegedExceptionAction[Unit] { - def run: Unit = func() - }) + // if we are already running as the user intended there is no reason to do the doAs. It + // will actually break secure HDFS access as it doesn't fill in the credentials. Also if + // the user is UNKNOWN then we shouldn't be creating a remote unknown user + // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only + // in SparkContext. + val currentUser = Option(System.getProperty("user.name")). + getOrElse(SparkContext.SPARK_UNKNOWN_USER) + if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) { + val ugi = UserGroupInformation.createRemoteUser(user) + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() + }) + } else { + func() + } } /** http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1d52b508/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 32901a5..47e958b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -132,6 +132,8 @@ class HadoopRDD[K, V]( override def getPartitions: Array[Partition] = { val jobConf = getJobConf() + // add the credentials here as this can be called before SparkContext initialized + SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(jobConf)
