Repository: spark
Updated Branches:
  refs/heads/master 6a37ed838 -> 51b53a758


[SPARK-3260] yarn - pass acls along with executor launch

Pass along the acl settings when we launch a container so that they can be 
applied to viewing the logs on a running NodeManager.

Author: Thomas Graves <tgra...@apache.org>

Closes #2185 from tgravescs/SPARK-3260 and squashes the following commits:

6f94b5a [Thomas Graves] make unit test more robust
28b9dd3 [Thomas Graves] yarn - pass acls along with executor launch


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51b53a75
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51b53a75
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51b53a75

Branch: refs/heads/master
Commit: 51b53a758c85f2e20ad9bd73ed815fcfa9c7180b
Parents: 6a37ed8
Author: Thomas Graves <tgra...@apache.org>
Authored: Fri Sep 5 09:54:40 2014 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Fri Sep 5 09:54:40 2014 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  7 +-
 .../deploy/yarn/YarnAllocationHandler.scala     |  7 +-
 .../spark/deploy/yarn/YarnRMClientImpl.scala    |  7 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 13 ++--
 .../apache/spark/deploy/yarn/ClientBase.scala   |  6 +-
 .../spark/deploy/yarn/YarnAllocator.scala       | 10 +--
 .../apache/spark/deploy/yarn/YarnRMClient.scala |  5 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 11 ++-
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  | 76 +++++++++++++++++++-
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  7 +-
 .../deploy/yarn/YarnAllocationHandler.scala     |  7 +-
 .../spark/deploy/yarn/YarnRMClientImpl.scala    |  7 +-
 12 files changed, 129 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/51b53a75/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 7dae248..10cbeb8 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
 
-import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.{SecurityManager, SparkConf, Logging}
 
 
 class ExecutorRunnable(
@@ -46,7 +46,8 @@ class ExecutorRunnable(
     slaveId: String,
     hostname: String,
     executorMemory: Int,
-    executorCores: Int)
+    executorCores: Int,
+    securityMgr: SecurityManager)
   extends Runnable with ExecutorRunnableUtil with Logging {
 
   var rpc: YarnRPC = YarnRPC.create(conf)
@@ -86,6 +87,8 @@ class ExecutorRunnable(
     logInfo("Setting up executor with commands: " + commands)
     ctx.setCommands(commands)
 
+    
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
+
     // Send the start request to the ContainerManager
     val startReq = Records.newRecord(classOf[StartContainerRequest])
     .asInstanceOf[StartContainerRequest]

http://git-wip-us.apache.org/repos/asf/spark/blob/51b53a75/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 9f9e16c..85d6274 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.scheduler.SplitInfo
 
 import org.apache.hadoop.conf.Configuration
@@ -41,8 +41,9 @@ private[yarn] class YarnAllocationHandler(
     resourceManager: AMRMProtocol,
     appAttemptId: ApplicationAttemptId,
     args: ApplicationMasterArguments,
-    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
-  extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
+    preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
+    securityMgr: SecurityManager)
+  extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {
 
   private val lastResponseId = new AtomicInteger()
   private val releaseList: CopyOnWriteArrayList[ContainerId] = new 
CopyOnWriteArrayList()

http://git-wip-us.apache.org/repos/asf/spark/blob/51b53a75/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index cc53921..ad27a9a 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.scheduler.SplitInfo
 import org.apache.spark.util.Utils
 
@@ -45,7 +45,8 @@ private class YarnRMClientImpl(args: 
ApplicationMasterArguments) extends YarnRMC
       sparkConf: SparkConf,
       preferredNodeLocations: Map[String, Set[SplitInfo]],
       uiAddress: String,
-      uiHistoryAddress: String) = {
+      uiHistoryAddress: String,
+      securityMgr: SecurityManager) = {
     this.rpc = YarnRPC.create(conf)
     this.uiHistoryAddress = uiHistoryAddress
 
@@ -53,7 +54,7 @@ private class YarnRMClientImpl(args: 
ApplicationMasterArguments) extends YarnRMC
     registerApplicationMaster(uiAddress)
 
     new YarnAllocationHandler(conf, sparkConf, resourceManager, 
getAttemptId(), args,
-      preferredNodeLocations)
+      preferredNodeLocations, securityMgr)
   }
 
   override def getAttemptId() = {

http://git-wip-us.apache.org/repos/asf/spark/blob/51b53a75/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 98039a2..a879c83 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -116,7 +116,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
     val securityMgr = new SecurityManager(sparkConf)
 
     if (isDriver) {
-      runDriver()
+      runDriver(securityMgr)
     } else {
       runExecutorLauncher(securityMgr)
     }
@@ -157,7 +157,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
     sparkContextRef.compareAndSet(sc, null)
   }
 
-  private def registerAM(uiAddress: String) = {
+  private def registerAM(uiAddress: String, securityMgr: SecurityManager) = {
     val sc = sparkContextRef.get()
 
     val appId = client.getAttemptId().getApplicationId().toString()
@@ -170,13 +170,14 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
       if (sc != null) sc.getConf else sparkConf,
       if (sc != null) sc.preferredNodeLocationData else Map(),
       uiAddress,
-      historyAddress)
+      historyAddress,
+      securityMgr)
 
     allocator.allocateResources()
     reporterThread = launchReporterThread()
   }
 
-  private def runDriver(): Unit = {
+  private def runDriver(securityMgr: SecurityManager): Unit = {
     addAmIpFilter()
     val userThread = startUserClass()
 
@@ -188,7 +189,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
     if (sc == null) {
       finish(FinalApplicationStatus.FAILED, "Timed out waiting for 
SparkContext.")
     } else {
-      registerAM(sc.ui.appUIHostPort)
+      registerAM(sc.ui.appUIHostPort, securityMgr)
       try {
         userThread.join()
       } finally {
@@ -203,7 +204,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
       conf = sparkConf, securityManager = securityMgr)._1
     actor = waitForSparkDriver()
     addAmIpFilter()
-    registerAM(sparkConf.get("spark.driver.appUIAddress", ""))
+    registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
 
     // In client mode the actor will stop the reporter thread.
     reporterThread.join()

http://git-wip-us.apache.org/repos/asf/spark/blob/51b53a75/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 5d8e5e6..8075b7a 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -430,10 +430,8 @@ trait ClientBase extends Logging {
 
     // send the acl settings into YARN to control who has access via YARN 
interfaces
     val securityManager = new SecurityManager(sparkConf)
-    val acls = Map[ApplicationAccessType, String] (
-      ApplicationAccessType.VIEW_APP -> securityManager.getViewAcls,
-      ApplicationAccessType.MODIFY_APP -> securityManager.getModifyAcls)
-    amContainer.setApplicationACLs(acls)
+    
amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
+
     amContainer
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/51b53a75/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index c74dd1c..02b9a81 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
 
-import org.apache.spark.{Logging, SparkConf, SparkEnv}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
 import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
@@ -55,7 +55,8 @@ private[yarn] abstract class YarnAllocator(
     conf: Configuration,
     sparkConf: SparkConf,
     args: ApplicationMasterArguments,
-    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
+    preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
+    securityMgr: SecurityManager)
   extends Logging {
 
   // These three are locked on allocatedHostToContainersMap. Complementary 
data structures
@@ -280,7 +281,8 @@ private[yarn] abstract class YarnAllocator(
             executorId,
             executorHostname,
             executorMemory,
-            executorCores)
+            executorCores,
+            securityMgr)
           new Thread(executorRunnable).start()
         }
       }
@@ -444,4 +446,4 @@ private[yarn] abstract class YarnAllocator(
 
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/51b53a75/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 922d7d1..ed65e56 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -22,7 +22,7 @@ import scala.collection.{Map, Set}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.records._
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.scheduler.SplitInfo
 
 /**
@@ -45,7 +45,8 @@ trait YarnRMClient {
       sparkConf: SparkConf,
       preferredNodeLocations: Map[String, Set[SplitInfo]],
       uiAddress: String,
-      uiHistoryAddress: String): YarnAllocator
+      uiHistoryAddress: String,
+      securityMgr: SecurityManager): YarnAllocator
 
   /**
    * Shuts down the AM. Guaranteed to only be called once.

http://git-wip-us.apache.org/repos/asf/spark/blob/51b53a75/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index dc77f12..4a33e34 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -32,10 +32,11 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.util.StringInterner
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType
 import org.apache.hadoop.yarn.util.RackResolver
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
@@ -211,4 +212,12 @@ object YarnSparkHadoopUtil {
     }
   }
 
+  private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager):
+      Map[ApplicationAccessType, String] = {
+    Map[ApplicationAccessType, String] (
+      ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls,
+      ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls
+    )
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/51b53a75/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 75db8ee..2cc5abb 100644
--- 
a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ 
b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -23,7 +23,10 @@ import com.google.common.io.{ByteStreams, Files}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.scalatest.{FunSuite, Matchers}
 
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+
 
 class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
 
@@ -74,4 +77,75 @@ class YarnSparkHadoopUtilSuite extends FunSuite with 
Matchers with Logging {
     yarnConf.get(key) should not be default.get(key)
   }
 
+
+  test("test getApplicationAclsForYarn acls on") {
+
+    // spark acls on, just pick up default user
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.acls.enable", "true")
+
+    val securityMgr = new SecurityManager(sparkConf)
+    val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
+
+    val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
+    val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
+
+    viewAcls match {
+      case Some(vacls) => {
+        val aclSet = vacls.split(',').map(_.trim).toSet
+        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+      }
+      case None => {
+        fail()
+      }
+    }
+    modifyAcls match {
+      case Some(macls) => {
+        val aclSet = macls.split(',').map(_.trim).toSet
+        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+      }
+      case None => {
+        fail()
+      }
+    }
+  }
+
+  test("test getApplicationAclsForYarn acls on and specify users") {
+
+    // default spark acls are on and specify acls
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.acls.enable", "true")
+    sparkConf.set("spark.ui.view.acls", "user1,user2")
+    sparkConf.set("spark.modify.acls", "user3,user4")
+
+    val securityMgr = new SecurityManager(sparkConf)
+    val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
+
+    val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
+    val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
+
+    viewAcls match {
+      case Some(vacls) => {
+        val aclSet = vacls.split(',').map(_.trim).toSet
+        assert(aclSet.contains("user1"))
+        assert(aclSet.contains("user2"))
+        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+      }
+      case None => {
+        fail()
+      }
+    }
+    modifyAcls match {
+      case Some(macls) => {
+        val aclSet = macls.split(',').map(_.trim).toSet
+        assert(aclSet.contains("user3"))
+        assert(aclSet.contains("user4"))
+        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+      }
+      case None => {
+        fail()
+      }
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/51b53a75/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 07ba0a4..833be12 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
 
-import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.{SecurityManager, SparkConf, Logging}
 
 
 class ExecutorRunnable(
@@ -46,7 +46,8 @@ class ExecutorRunnable(
     slaveId: String,
     hostname: String,
     executorMemory: Int,
-    executorCores: Int)
+    executorCores: Int,
+    securityMgr: SecurityManager)
   extends Runnable with ExecutorRunnableUtil with Logging {
 
   var rpc: YarnRPC = YarnRPC.create(conf)
@@ -85,6 +86,8 @@ class ExecutorRunnable(
     logInfo("Setting up executor with commands: " + commands)
     ctx.setCommands(commands)
 
+    
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
+
     // Send the start request to the ContainerManager
     nmClient.startContainer(container, ctx)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/51b53a75/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index ed31457..c887cb5 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.yarn
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf} 
 import org.apache.spark.scheduler.SplitInfo
 
 import org.apache.hadoop.conf.Configuration
@@ -39,8 +39,9 @@ private[yarn] class YarnAllocationHandler(
     amClient: AMRMClient[ContainerRequest],
     appAttemptId: ApplicationAttemptId,
     args: ApplicationMasterArguments,
-    preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
-  extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
+    preferredNodes: collection.Map[String, collection.Set[SplitInfo]], 
+    securityMgr: SecurityManager)
+  extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {
 
   override protected def releaseContainer(container: Container) = {
     amClient.releaseAssignedContainer(container.getId())

http://git-wip-us.apache.org/repos/asf/spark/blob/51b53a75/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index e8b8d9b..54bc6b1 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.scheduler.SplitInfo
 import org.apache.spark.util.Utils
 
@@ -46,7 +46,8 @@ private class YarnRMClientImpl(args: 
ApplicationMasterArguments) extends YarnRMC
       sparkConf: SparkConf,
       preferredNodeLocations: Map[String, Set[SplitInfo]],
       uiAddress: String,
-      uiHistoryAddress: String) = {
+      uiHistoryAddress: String,
+      securityMgr: SecurityManager) = {
     amClient = AMRMClient.createAMRMClient()
     amClient.init(conf)
     amClient.start()
@@ -55,7 +56,7 @@ private class YarnRMClientImpl(args: 
ApplicationMasterArguments) extends YarnRMC
     logInfo("Registering the ApplicationMaster")
     amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
     new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
-      preferredNodeLocations)
+      preferredNodeLocations, securityMgr)
   }
 
   override def shutdown(status: FinalApplicationStatus, diagnostics: String = 
"") =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to