Repository: spark
Updated Branches:
  refs/heads/master 40566e10a -> 7edbea41b


http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index c1e8b29..96a5a12 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -18,21 +18,22 @@
 package org.apache.spark.metrics
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
-
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.master.MasterSource
 
 class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
   var filePath: String = _
   var conf: SparkConf = null
+  var securityMgr: SecurityManager = null
 
   before {
     filePath = 
getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
     conf = new SparkConf(false).set("spark.metrics.conf", filePath)
+    securityMgr = new SecurityManager(conf)
   }
 
   test("MetricsSystem with default config") {
-    val metricsSystem = MetricsSystem.createMetricsSystem("default", conf)
+    val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, 
securityMgr)
     val sources = metricsSystem.sources
     val sinks = metricsSystem.sinks
 
@@ -42,7 +43,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter 
{
   }
 
   test("MetricsSystem with sources add") {
-    val metricsSystem = MetricsSystem.createMetricsSystem("test", conf)
+    val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, 
securityMgr)
     val sources = metricsSystem.sources
     val sinks = metricsSystem.sinks
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9f011d9..121e47c 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.concurrent.Timeouts._
 import org.scalatest.matchers.ShouldMatchers._
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, 
Utils}
 
@@ -39,6 +39,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter 
with PrivateMethodT
   var actorSystem: ActorSystem = null
   var master: BlockManagerMaster = null
   var oldArch: String = null
+  conf.set("spark.authenticate", "false")
+  val securityMgr = new SecurityManager(conf)
 
   // Reuse a serializer across tests to avoid creating a new thread-local 
buffer on each test
   conf.set("spark.kryoserializer.buffer.mb", "1")
@@ -49,7 +51,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter 
with PrivateMethodT
   def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
 
   before {
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", 
"localhost", 0, conf = conf)
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", 
"localhost", 0, conf = conf,
+      securityManager = securityMgr)
     this.actorSystem = actorSystem
     conf.set("spark.driver.port", boundPort.toString)
 
@@ -125,7 +128,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("master + 1 manager interaction") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -155,8 +158,9 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("master + 2 managers interaction") {
-    store = new BlockManager("exec1", actorSystem, master, serializer, 2000, 
conf)
-    store2 = new BlockManager("exec2", actorSystem, master, new 
KryoSerializer(conf), 2000, conf)
+    store = new BlockManager("exec1", actorSystem, master, serializer, 2000, 
conf, securityMgr)
+    store2 = new BlockManager("exec2", actorSystem, master, new 
KryoSerializer(conf), 2000, conf,
+      securityMgr)
 
     val peers = master.getPeers(store.blockManagerId, 1)
     assert(peers.size === 1, "master did not return the other manager as a 
peer")
@@ -171,7 +175,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("removing block") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -219,7 +223,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("removing rdd") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -253,7 +257,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
 
   test("reregistration on heart beat") {
     val heartBeat = PrivateMethod[Unit]('heartBeat)
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf, securityMgr)
     val a1 = new Array[Byte](400)
 
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -269,7 +273,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("reregistration on block update") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
 
@@ -288,7 +292,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
 
   test("reregistration doesn't dead lock") {
     val heartBeat = PrivateMethod[Unit]('heartBeat)
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
2000, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = List(new Array[Byte](400))
 
@@ -325,7 +329,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("in-memory LRU storage") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -344,7 +348,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("in-memory LRU storage with serialization") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -363,7 +367,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("in-memory LRU for partitions of same RDD") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -382,7 +386,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("in-memory LRU for partitions of multiple RDDs") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr)
     store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -405,7 +409,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("on-disk storage") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -418,7 +422,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("disk and memory storage") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -433,7 +437,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("disk and memory storage with getLocalBytes") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -448,7 +452,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("disk and memory storage with serialization") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -463,7 +467,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("disk and memory storage with serialization and getLocalBytes") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -478,7 +482,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("LRU with mixed storage levels") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -503,7 +507,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("in-memory LRU with streams") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr)
     val list1 = List(new Array[Byte](200), new Array[Byte](200))
     val list2 = List(new Array[Byte](200), new Array[Byte](200))
     val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -527,7 +531,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("LRU with mixed storage levels and streams") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf, securityMgr)
     val list1 = List(new Array[Byte](200), new Array[Byte](200))
     val list2 = List(new Array[Byte](200), new Array[Byte](200))
     val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -573,7 +577,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   }
 
   test("overly large block") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 500, 
conf)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 500, 
conf, securityMgr)
     store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
     assert(store.getSingle("a1") === None, "a1 was in store")
     store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
@@ -584,7 +588,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
   test("block compression") {
     try {
       conf.set("spark.shuffle.compress", "true")
-      store = new BlockManager("exec1", actorSystem, master, serializer, 2000, 
conf)
+      store = new BlockManager("exec1", actorSystem, master, serializer, 2000, 
conf, securityMgr)
       store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
         "shuffle_0_0_0 was not compressed")
@@ -592,7 +596,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
       store = null
 
       conf.set("spark.shuffle.compress", "false")
-      store = new BlockManager("exec2", actorSystem, master, serializer, 2000, 
conf)
+      store = new BlockManager("exec2", actorSystem, master, serializer, 2000, 
conf, securityMgr)
       store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000,
         "shuffle_0_0_0 was compressed")
@@ -600,7 +604,7 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
       store = null
 
       conf.set("spark.broadcast.compress", "true")
-      store = new BlockManager("exec3", actorSystem, master, serializer, 2000, 
conf)
+      store = new BlockManager("exec3", actorSystem, master, serializer, 2000, 
conf, securityMgr)
       store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100,
         "broadcast_0 was not compressed")
@@ -608,28 +612,28 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
       store = null
 
       conf.set("spark.broadcast.compress", "false")
-      store = new BlockManager("exec4", actorSystem, master, serializer, 2000, 
conf)
+      store = new BlockManager("exec4", actorSystem, master, serializer, 2000, 
conf, securityMgr)
       store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, 
"broadcast_0 was compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "true")
-      store = new BlockManager("exec5", actorSystem, master, serializer, 2000, 
conf)
+      store = new BlockManager("exec5", actorSystem, master, serializer, 2000, 
conf, securityMgr)
       store.putSingle(rdd(0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not 
compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "false")
-      store = new BlockManager("exec6", actorSystem, master, serializer, 2000, 
conf)
+      store = new BlockManager("exec6", actorSystem, master, serializer, 2000, 
conf, securityMgr)
       store.putSingle(rdd(0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was 
compressed")
       store.stop()
       store = null
 
       // Check that any other block types are also kept uncompressed
-      store = new BlockManager("exec7", actorSystem, master, serializer, 2000, 
conf)
+      store = new BlockManager("exec7", actorSystem, master, serializer, 2000, 
conf, securityMgr)
       store.putSingle("other_block", new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY)
       assert(store.memoryStore.getSize("other_block") >= 1000, "other_block 
was compressed")
       store.stop()
@@ -643,7 +647,8 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
 
   test("block store put failure") {
     // Use Java serializer so we can create an unserializable error.
-    store = new BlockManager("<driver>", actorSystem, master, new 
JavaSerializer(conf), 1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, new 
JavaSerializer(conf), 1200, conf,
+      securityMgr)
 
     // The put should fail since a1 is not serializable.
     class UnserializableClass

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/test/scala/org/apache/spark/ui/UISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 20ebb18..3041581 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -24,6 +24,8 @@ import scala.util.{Failure, Success, Try}
 import org.eclipse.jetty.server.Server
 import org.scalatest.FunSuite
 
+import org.apache.spark.SparkConf
+
 class UISuite extends FunSuite {
   test("jetty port increases under contention") {
     val startPort = 4040
@@ -34,15 +36,17 @@ class UISuite extends FunSuite {
       case Failure(e) => 
       // Either case server port is busy hence setup for test complete
     }
-    val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("0.0.0.0", 
startPort, Seq())
-    val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("0.0.0.0", 
startPort, Seq())
+    val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("0.0.0.0", 
startPort, Seq(),
+      new SparkConf)
+    val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("0.0.0.0", 
startPort, Seq(),
+      new SparkConf)
     // Allow some wiggle room in case ports on the machine are under contention
     assert(boundPort1 > startPort && boundPort1 < startPort + 10)
     assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
   }
 
   test("jetty binds to port 0 correctly") {
-    val (jettyServer, boundPort) = JettyUtils.startJettyServer("0.0.0.0", 0, 
Seq())
+    val (jettyServer, boundPort) = JettyUtils.startJettyServer("0.0.0.0", 0, 
Seq(), new SparkConf)
     assert(jettyServer.getState === "STARTED")
     assert(boundPort != 0)
     Try {new ServerSocket(boundPort)} match {

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 017d509..913c653 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -147,6 +147,34 @@ Apart from these, the following properties are also 
available, and may be useful
     How many stages the Spark UI remembers before garbage collecting.
   </td>
 </tr>
+</tr>
+  <td>spark.ui.filters</td>
+  <td>None</td>
+  <td>
+    Comma separated list of filter class names to apply to the Spark web ui. 
The filter should be a
+    standard javax servlet Filter. Parameters to each filter can also be 
specified by setting a
+    java system property of spark.<class name of 
filter>.params='param1=value1,param2=value2'
+    (e.g.-Dspark.ui.filters=com.test.filter1 
-Dspark.com.test.filter1.params='param1=foo,param2=testing')
+  </td>
+</tr>
+<tr>
+  <td>spark.ui.acls.enable</td>
+  <td>false</td>
+  <td>
+    Whether spark web ui acls should are enabled. If enabled, this checks to 
see if the user has 
+    access permissions to view the web ui. See <code>spark.ui.view.acls</code> 
for more details.
+    Also note this requires the user to be known, if the user comes across as 
null no checks
+    are done. Filters can be used to authenticate and set the user.
+  </td>
+</tr>
+<tr>  
+  <td>spark.ui.view.acls</td>
+  <td>Empty</td>
+  <td>
+    Comma separated list of users that have view access to the spark web ui. 
By default only the
+    user that started the Spark job has view access.
+  </td>
+</tr>
 <tr>
   <td>spark.shuffle.compress</td>
   <td>true</td>
@@ -495,6 +523,29 @@ Apart from these, the following properties are also 
available, and may be useful
   <td>
     Whether to overwrite files added through SparkContext.addFile() when the 
target file exists and its contents do not match those of the source.
   </td>
+<tr>  
+  <td>spark.authenticate</td>
+  <td>false</td>
+  <td>
+    Whether spark authenticates its internal connections. See 
<code>spark.authenticate.secret</code> if not
+    running on Yarn.
+  </td>
+</tr>
+<tr>  
+  <td>spark.authenticate.secret</td>
+  <td>None</td>
+  <td>
+    Set the secret key used for Spark to authenticate between components. This 
needs to be set if
+    not running on Yarn and authentication is enabled.
+  </td>
+</tr>
+<tr>  
+  <td>spark.core.connection.auth.wait.timeout</td>
+  <td>30</td>
+  <td>
+    Number of seconds for the connection to wait for authentication to occur 
before timing
+    out and giving up. 
+  </td>
 </tr>
 </table>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 4eb297d..c4f4d79 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -103,6 +103,7 @@ For this version of Spark (0.8.1) Hadoop 2.2.x (or newer) 
users will have to bui
 
 * [Configuration](configuration.html): customize Spark via its configuration 
system
 * [Tuning Guide](tuning.html): best practices to optimize performance and 
memory use
+* [Security](security.html): Spark security support
 * [Hardware Provisioning](hardware-provisioning.html): recommendations for 
cluster hardware
 * [Job Scheduling](job-scheduling.html): scheduling resources across and 
within Spark applications
 * [Building Spark with Maven](building-with-maven.html): build Spark using the 
Maven system

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/docs/security.md
----------------------------------------------------------------------
diff --git a/docs/security.md b/docs/security.md
new file mode 100644
index 0000000..9e4218f
--- /dev/null
+++ b/docs/security.md
@@ -0,0 +1,18 @@
+---
+layout: global
+title: Spark Security
+---
+
+Spark currently supports authentication via a shared secret. Authentication 
can be configured to be on via the `spark.authenticate` configuration 
parameter. This parameter controls whether the Spark communication protocols do 
authentication using the shared secret. This authentication is a basic 
handshake to make sure both sides have the same shared secret and are allowed 
to communicate. If the shared secret is not identical they will not be allowed 
to communicate.
+
+The Spark UI can also be secured by using javax servlet filters. A user may 
want to secure the UI if it has data that other users should not be allowed to 
see. The javax servlet filter specified by the user can authenticate the user 
and then once the user is logged in, Spark can compare that user versus the 
view acls to make sure they are authorized to view the UI. The configs 
'spark.ui.acls.enable' and 'spark.ui.view.acls' control the behavior of the 
acls. Note that the person who started the application always has view access 
to the UI.
+
+For Spark on Yarn deployments, configuring `spark.authenticate` to true will 
automatically handle generating and distributing the shared secret. Each 
application will use a unique shared secret. The Spark UI uses the standard 
YARN web application proxy mechanism and will authenticate via any installed 
Hadoop filters. If an authentication filter is enabled, the acls controls can 
be used by control which users can via the Spark UI. 
+
+For other types of Spark deployments, the spark config 
`spark.authenticate.secret` should be configured on each of the nodes. This 
secret will be used by all the Master/Workers and applications. The UI can be 
secured using a javax servlet filter installed via `spark.ui.filters`. If an 
authentication filter is enabled, the acls controls can be used by control 
which users can via the Spark UI.
+
+IMPORTANT NOTE: The NettyBlockFetcherIterator is not secured so do not use 
netty for the shuffle is running with authentication on.
+
+See [Spark Configuration](configuration.html) for more details on the security 
configs.
+
+See <a 
href="api/core/index.html#org.apache.spark.SecurityManager"><code>org.apache.spark.SecurityManager</code></a>
 for implementation details about security.

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 3d7b390..62d3a52 100644
--- 
a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -23,7 +23,7 @@ import scala.util.Random
 
 import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SecurityManager}
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
 import org.apache.spark.streaming.receivers.Receiver
@@ -112,8 +112,9 @@ object FeederActor {
     }
     val Seq(host, port) = args.toSeq
 
-
-    val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, 
conf = new SparkConf)._1
+    val conf = new SparkConf
+    val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, 
conf = conf,
+      securityManager = new SecurityManager(conf))._1
     val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
 
     println("Feeder started as:" + feeder)

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c59fada..3b86385 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,6 +157,21 @@
     <dependencies>
       <dependency>
         <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-util</artifactId>
+        <version>7.6.8.v20121106</version>
+      </dependency>
+      <dependency>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-security</artifactId>
+        <version>7.6.8.v20121106</version>
+      </dependency>
+      <dependency>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-plus</artifactId>
+        <version>7.6.8.v20121106</version>
+      </dependency>
+      <dependency>
+        <groupId>org.eclipse.jetty</groupId>
         <artifactId>jetty-server</artifactId>
         <version>7.6.8.v20121106</version>
       </dependency>
@@ -296,6 +311,11 @@
         <version>${mesos.version}</version>
       </dependency>
       <dependency>
+        <groupId>commons-net</groupId>
+        <artifactId>commons-net</artifactId>
+        <version>2.2</version>
+      </dependency>
+      <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
         <version>4.0.17.Final</version>

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index aa17848..138aad7 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -226,6 +226,9 @@ object SparkBuild extends Build {
     libraryDependencies ++= Seq(
         "io.netty"          % "netty-all"       % "4.0.17.Final",
         "org.eclipse.jetty" % "jetty-server"    % "7.6.8.v20121106",
+        "org.eclipse.jetty" % "jetty-util" % "7.6.8.v20121106",
+        "org.eclipse.jetty" % "jetty-plus" % "7.6.8.v20121106",
+        "org.eclipse.jetty" % "jetty-security" % "7.6.8.v20121106",
         /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. 
Fixed in ivy 2.3.0. */
         "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" 
artifacts Artifact("javax.servlet", "jar", "jar"),
         "org.scalatest"    %% "scalatest"       % "1.9.1"  % "test",
@@ -285,6 +288,7 @@ object SparkBuild extends Build {
         "it.unimi.dsi"               % "fastutil"         % "6.4.4",
         "colt"                       % "colt"             % "1.2.0",
         "org.apache.mesos"           % "mesos"            % "0.13.0",
+        "commons-net"                % "commons-net"      % "2.2",
         "net.java.dev.jets3t"        % "jets3t"           % "0.7.1" 
excludeAll(excludeCommonsLogging),
         "org.apache.derby"           % "derby"            % "10.4.2.0"         
            % "test",
         "org.apache.hadoop"          % hadoopClient       % hadoopVersion 
excludeAll(excludeNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J),

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
----------------------------------------------------------------------
diff --git 
a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala 
b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index e3bcf7f..1aa9407 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -18,12 +18,15 @@
 package org.apache.spark.repl
 
 import java.io.{ByteArrayOutputStream, InputStream}
-import java.net.{URI, URL, URLClassLoader, URLEncoder}
+import java.net.{URI, URL, URLEncoder}
 import java.util.concurrent.{Executors, ExecutorService}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
+import org.apache.spark.SparkEnv
+import org.apache.spark.util.Utils
+
 import org.objectweb.asm._
 import org.objectweb.asm.Opcodes._
 
@@ -53,7 +56,13 @@ extends ClassLoader(parent) {
         if (fileSystem != null) {
           fileSystem.open(new Path(directory, pathInDirectory))
         } else {
-          new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+          if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
+            val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
+            val newuri = Utils.constructURIForAuthentication(uri, 
SparkEnv.get.securityManager)
+            newuri.toURL().openStream()
+          } else {
+            new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+          }
         }
       }
       val bytes = readAndTransformClass(name, inputStream)

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index f52ebe4..9b1da19 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -881,6 +881,8 @@ class SparkILoop(in0: Option[BufferedReader], protected val 
out: JPrintWriter,
       })
 
   def process(settings: Settings): Boolean = savingContextLoader {
+    if (getMaster() == "yarn-client") System.setProperty("SPARK_YARN_MODE", 
"true")
+
     this.settings = settings
     createInterpreter()
 
@@ -939,16 +941,9 @@ class SparkILoop(in0: Option[BufferedReader], protected 
val out: JPrintWriter,
 
   def createSparkContext(): SparkContext = {
     val execUri = System.getenv("SPARK_EXECUTOR_URI")
-    val master = this.master match {
-      case Some(m) => m
-      case None => {
-        val prop = System.getenv("MASTER")
-        if (prop != null) prop else "local"
-      }
-    }
     val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
     val conf = new SparkConf()
-      .setMaster(master)
+      .setMaster(getMaster())
       .setAppName("Spark shell")
       .setJars(jars)
       .set("spark.repl.class.uri", intp.classServer.uri)
@@ -963,6 +958,17 @@ class SparkILoop(in0: Option[BufferedReader], protected 
val out: JPrintWriter,
     sparkContext
   }
 
+  private def getMaster(): String = {
+    val master = this.master match {
+      case Some(m) => m
+      case None => {
+        val prop = System.getenv("MASTER")
+        if (prop != null) prop else "local"
+      }
+    }
+    master
+  }
+
   /** process command-line arguments and do as they request */
   def process(args: Array[String]): Boolean = {
     val command = new SparkCommandLine(args.toList, msg => echo(msg))

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala 
b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 1d73d0b..90a96ad 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -36,7 +36,7 @@ import scala.tools.reflect.StdRuntimeTags._
 import scala.util.control.ControlThrowable
 import util.stackTraceString
 
-import org.apache.spark.{HttpServer, SparkConf, Logging}
+import org.apache.spark.{Logging, HttpServer, SecurityManager, SparkConf}
 import org.apache.spark.util.Utils
 
 // /** directory to save .class files to */
@@ -83,15 +83,17 @@ import org.apache.spark.util.Utils
    *  @author Moez A. Abdel-Gawad
    *  @author Lex Spoon
    */
-  class SparkIMain(initialSettings: Settings, val out: JPrintWriter) extends 
SparkImports with Logging {
+  class SparkIMain(initialSettings: Settings, val out: JPrintWriter)
+      extends SparkImports with Logging {
     imain =>
 
-      val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == 
"1")
+    val conf = new SparkConf()
 
+    val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
       /** Local directory to save .class files too */
       val outputDir = {
         val tmp = System.getProperty("java.io.tmpdir")
-        val rootDir = new SparkConf().get("spark.repl.classdir",  tmp)
+        val rootDir = conf.get("spark.repl.classdir",  tmp)
         Utils.createTempDir(rootDir)
       }
       if (SPARK_DEBUG_REPL) {
@@ -99,7 +101,8 @@ import org.apache.spark.util.Utils
       }
 
     val virtualDirectory                              = new 
PlainFile(outputDir) // "directory" for classfiles
-    val classServer                                   = new 
HttpServer(outputDir)     /** Jetty server that will serve our classes to 
worker nodes */
+    val classServer                                   = new 
HttpServer(outputDir,
+      new SecurityManager(conf)) /** Jetty server that will serve our classes 
to worker nodes */
     private var currentSettings: Settings             = initialSettings
     var printResults                                  = true      // whether 
to print result lines
     var totalSilence                                  = false     // whether 
to print anything

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e045b9f..bb574f4 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,7 +27,6 @@ import scala.collection.JavaConversions._
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.util.ShutdownHookManager
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.records._
@@ -36,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
@@ -87,27 +86,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
     resourceManager = registerWithResourceManager()
 
-    // Workaround until hadoop moves to something which has
-    // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in 
(2.0.2-alpha but no 0.23 line)
-    // ignore result.
-    // This does not, unfortunately, always work reliably ... but alleviates 
the bug a lot of times
-    // Hence args.workerCores = numCore disabled above. Any better option?
-
-    // Compute number of threads for akka
-    //val minimumMemory = 
appMasterResponse.getMinimumResourceCapability().getMemory()
-    //if (minimumMemory > 0) {
-    //  val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-    //  val numCore = (mem  / minimumMemory) + (if (0 != (mem % 
minimumMemory)) 1 else 0)
-
-    //  if (numCore > 0) {
-        // do not override - hits 
https://issues.apache.org/jira/browse/HADOOP-8406
-        // TODO: Uncomment when hadoop is on a version which has this fixed.
-        // args.workerCores = numCore
-    //  }
-    //}
-    // 
org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+    // setup AmIpFilter for the SparkUI - do this before we start the UI
+    addAmIpFilter()
 
     ApplicationMaster.register(this)
+
+    // Call this to force generation of secret so it gets populated into the
+    // hadoop UGI. This has to happen before the startUserClass which does a
+    // doAs in order for the credentials to be passed on to the worker 
containers.
+    val securityMgr = new SecurityManager(sparkConf)
+
     // Start the user's JAR
     userThread = startUserClass()
 
@@ -132,6 +120,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     System.exit(0)
   }
 
+  // add the yarn amIpFilter that Yarn requires for properly securing the UI
+  private def addAmIpFilter() {
+    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+    System.setProperty("spark.ui.filters", amFilter)
+    val proxy = YarnConfiguration.getProxyHostAndPort(conf)
+    val parts : Array[String] = proxy.split(":")
+    val uriBase = "http://"; + proxy +
+      System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+
+    val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+    
System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params",
+      params)
+  }
+
   /** Get the Yarn approved local directories. */
   private def getLocalDirs(): String = {
     // Hadoop 0.23 and 2.x have different Environment variable names for the

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 138c279..b735d01 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import akka.actor._
 import akka.remote._
 import akka.actor.Terminated
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
@@ -50,8 +50,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: 
Configuration, spar
   private var yarnAllocator: YarnAllocationHandler = _
   private var driverClosed:Boolean = false
 
+  val securityManager = new SecurityManager(sparkConf)
   val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", 
Utils.localHostName, 0,
-    conf = sparkConf)._1
+    conf = sparkConf, securityManager = securityManager)._1
   var actor: ActorRef = _
 
   // This actor just working as a monitor to watch on Driver Actor.
@@ -110,6 +111,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, 
conf: Configuration, spar
     // we want to be reasonably responsive without causing too many requests 
to RM.
     val schedulerInterval =
       System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", 
"5000").toLong
+
     // must be <= timeoutInterval / 2.
     val interval = math.min(timeoutInterval / 2, schedulerInterval)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index fe37168..11322b1 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -134,7 +134,7 @@ class ClientArguments(val args: Array[String], val 
sparkConf: SparkConf) {
       "  --args ARGS                Arguments to be passed to your 
application's main class.\n" +
       "                             Mutliple invocations are possible, each 
will be passed in order.\n" +
       "  --num-workers NUM          Number of workers to start (Default: 2)\n" 
+
-      "  --worker-cores NUM         Number of cores for the workers (Default: 
1). This is unsused right now.\n" +
+      "  --worker-cores NUM         Number of cores for the workers (Default: 
1).\n" +
       "  --master-class CLASS_NAME  Class Name for Master (Default: 
spark.deploy.yarn.ApplicationMaster)\n" +
       "  --master-memory MEM        Memory for Master (e.g. 1000M, 2G) 
(Default: 512 Mb)\n" +
       "  --worker-memory MEM        Memory per Worker (e.g. 1000M, 2G) 
(Default: 1G)\n" +

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index d6c12a9..4c6e1dc 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.deploy.yarn
 
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.conf.Configuration
+import org.apache.spark.deploy.SparkHadoopUtil
 
 /**
  * Contains util methods to interact with Hadoop from spark.
@@ -44,4 +46,24 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     val jobCreds = conf.getCredentials()
     jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
   }
+
+  override def getCurrentUserCredentials(): Credentials = { 
+    UserGroupInformation.getCurrentUser().getCredentials()
+  }
+
+  override def addCurrentUserCredentials(creds: Credentials) {
+    UserGroupInformation.getCurrentUser().addCredentials(creds)
+  }
+
+  override def addSecretKeyToUserCredentials(key: String, secret: String) {
+    val creds = new Credentials()
+    creds.addSecretKey(new Text(key), secret.getBytes())
+    addCurrentUserCredentials(creds)
+  }
+
+  override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = {
+    val credentials = getCurrentUserCredentials()
+    if (credentials != null) credentials.getSecretKey(new Text(key)) else null
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index dd117d5..b48a2d5 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,7 +27,6 @@ import scala.collection.JavaConversions._
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.util.ShutdownHookManager
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.protocolrecords._
@@ -37,8 +36,9 @@ import 
org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
@@ -91,12 +91,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     amClient.init(yarnConf)
     amClient.start()
 
-    // Workaround until hadoop moves to something which has
-    // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in 
(2.0.2-alpha but no 0.23 line)
-    // 
org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+    // setup AmIpFilter for the SparkUI - do this before we start the UI
+    addAmIpFilter()
 
     ApplicationMaster.register(this)
 
+    // Call this to force generation of secret so it gets populated into the
+    // hadoop UGI. This has to happen before the startUserClass which does a
+    // doAs in order for the credentials to be passed on to the worker 
containers.
+    val securityMgr = new SecurityManager(sparkConf)
+
     // Start the user's JAR
     userThread = startUserClass()
 
@@ -121,6 +125,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
     System.exit(0)
   }
 
+  // add the yarn amIpFilter that Yarn requires for properly securing the UI
+  private def addAmIpFilter() {
+    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+    System.setProperty("spark.ui.filters", amFilter)
+    val proxy = WebAppUtils.getProxyHostAndPort(conf)
+    val parts : Array[String] = proxy.split(":")
+    val uriBase = "http://"; + proxy +
+      System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+
+    val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
+    
System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params",
 params)
+  }
+
   /** Get the Yarn approved local directories. */
   private def getLocalDirs(): String = {
     // Hadoop 0.23 and 2.x have different Environment variable names for the
@@ -261,7 +278,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, 
conf: Configuration,
       val schedulerInterval =
         sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
 
-
       // must be <= timeoutInterval / 2.
       val interval = math.min(timeoutInterval / 2, schedulerInterval)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 40600f3..f1c1fea 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import akka.actor._
 import akka.remote._
 import akka.actor.Terminated
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
@@ -52,8 +52,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: 
Configuration, spar
 
   private var amClient: AMRMClient[ContainerRequest] = _
 
+  val securityManager = new SecurityManager(sparkConf)
   val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", 
Utils.localHostName, 0,
-    conf = sparkConf)._1
+    conf = sparkConf, securityManager = securityManager)._1
   var actor: ActorRef = _
 
   // This actor just working as a monitor to watch on Driver Actor.
@@ -105,6 +106,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, 
conf: Configuration, spar
     val interval = math.min(timeoutInterval / 2, schedulerInterval)
 
     reporterThread = launchReporterThread(interval)
+    
 
     // Wait for the reporter thread to Finish.
     reporterThread.join()

Reply via email to