Updated Branches:
  refs/heads/branch-0.8 32a0c4f61 -> 1d52b5081

Merge pull request #149 from tgravescs/fixSecureHdfsAccess

Fix secure hdfs access for spark on yarn

https://github.com/apache/incubator-spark/pull/23 broke secure hdfs access. Not 
sure if it works with secure hdfs on standalone. Fixing it at least for spark 
on yarn.

The broadcasting of jobconf change also broke secure hdfs access as it didn't 
take into account things calling the getPartitions before sparkContext is 
initialized. The DAGScheduler does this as it tries to getShuffleMapStage.

(cherry picked from commit 8af99f2356ed19fe43d722ada02f8802cbd46d40)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1d52b508
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1d52b508
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1d52b508

Branch: refs/heads/branch-0.8
Commit: 1d52b5081171bad15527bc882509a874e865260c
Parents: 32a0c4f
Author: Matei Zaharia <[email protected]>
Authored: Sat Nov 9 13:48:00 2013 -0800
Committer: Reynold Xin <[email protected]>
Committed: Sat Nov 9 15:47:40 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 21 +++++++++++++++-----
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  2 ++
 2 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1d52b508/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index c29a301..fc1537f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.UserGroupInformation
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkContext, SparkException}
 
 /**
  * Contains util methods to interact with Hadoop from Spark.
@@ -34,10 +34,21 @@ class SparkHadoopUtil {
   UserGroupInformation.setConfiguration(conf)
 
   def runAsUser(user: String)(func: () => Unit) {
-    val ugi = UserGroupInformation.createRemoteUser(user)
-    ugi.doAs(new PrivilegedExceptionAction[Unit] {
-      def run: Unit = func()
-    })
+    // if we are already running as the user intended there is no reason to do 
the doAs. It 
+    // will actually break secure HDFS access as it doesn't fill in the 
credentials. Also if
+    // the user is UNKNOWN then we shouldn't be creating a remote unknown user 
+    // (this is actually the path spark on yarn takes) since SPARK_USER is 
initialized only 
+    // in SparkContext.
+    val currentUser = Option(System.getProperty("user.name")).
+      getOrElse(SparkContext.SPARK_UNKNOWN_USER)
+    if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
+      val ugi = UserGroupInformation.createRemoteUser(user)
+      ugi.doAs(new PrivilegedExceptionAction[Unit] {
+        def run: Unit = func()
+      })
+    } else {
+      func()
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1d52b508/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 32901a5..47e958b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -132,6 +132,8 @@ class HadoopRDD[K, V](
 
   override def getPartitions: Array[Partition] = {
     val jobConf = getJobConf()
+    // add the credentials here as this can be called before SparkContext 
initialized
+    SparkHadoopUtil.get.addCredentials(jobConf)
     val inputFormat = getInputFormat(jobConf)
     if (inputFormat.isInstanceOf[Configurable]) {
       inputFormat.asInstanceOf[Configurable].setConf(jobConf)

Reply via email to