Repository: spark
Updated Branches:
  refs/heads/master ee7f30856 -> 09f7e4587


[SPARK-2157] Enable tight firewall rules for Spark

The goal of this PR is to allow users of Spark to write tight firewall rules 
for their clusters. This is currently not possible because Spark uses random 
ports in many places, notably the communication between executors and drivers. 
The changes in this PR are based on top of ash211's changes in #1107.

The list covered here may or may not be the complete set of port needed for 
Spark to operate perfectly. However, as of the latest commit there are no known 
sources of random ports (except in tests). I have not documented a few of the 
more obscure configs.

My spark-env.sh looks like this:
```
export SPARK_MASTER_PORT=6060
export SPARK_WORKER_PORT=7070
export SPARK_MASTER_WEBUI_PORT=9090
export SPARK_WORKER_WEBUI_PORT=9091
```
and my spark-defaults.conf looks like this:
```
spark.master spark://andrews-mbp:6060
spark.driver.port 5001
spark.fileserver.port 5011
spark.broadcast.port 5021
spark.replClassServer.port 5031
spark.blockManager.port 5041
spark.executor.port 5051
```

Author: Andrew Or <andrewo...@gmail.com>
Author: Andrew Ash <and...@andrewash.com>

Closes #1777 from andrewor14/configure-ports and squashes the following commits:

621267b [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
configure-ports
8a6b820 [Andrew Or] Use a random UI port during tests
7da0493 [Andrew Or] Fix tests
523c30e [Andrew Or] Add test for isBindCollision
b97b02a [Andrew Or] Minor fixes
c22ad00 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
configure-ports
93d359f [Andrew Or] Executors connect to wrong port when collision occurs
d502e5f [Andrew Or] Handle port collisions when creating Akka systems
a2dd05c [Andrew Or] Patrick's comment nit
86461e2 [Andrew Or] Remove spark.executor.env.port and 
spark.standalone.client.port
1d2d5c6 [Andrew Or] Fix ports for standalone cluster mode
cb3be88 [Andrew Or] Various doc fixes (broken link, format etc.)
e837cde [Andrew Or] Remove outdated TODOs
bfbab28 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
configure-ports
de1b207 [Andrew Or] Update docs to reflect new ports
b565079 [Andrew Or] Add spark.ports.maxRetries
2551eb2 [Andrew Or] Remove spark.worker.watcher.port
151327a [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
configure-ports
9868358 [Andrew Or] Add a few miscellaneous ports
6016e77 [Andrew Or] Add spark.executor.port
8d836e6 [Andrew Or] Also document SPARK_{MASTER/WORKER}_WEBUI_PORT
4d9e6f3 [Andrew Or] Fix super subtle bug
3f8e51b [Andrew Or] Correct erroneous docs...
e111d08 [Andrew Or] Add names for UI services
470f38c [Andrew Or] Special case non-"Address already in use" exceptions
1d7e408 [Andrew Or] Treat 0 ports specially + return correct ConnectionManager 
port
ba32280 [Andrew Or] Minor fixes
6b550b0 [Andrew Or] Assorted fixes
73fbe89 [Andrew Or] Move start service logic to Utils
ec676f4 [Andrew Or] Merge branch 'SPARK-2157' of github.com:ash211/spark into 
configure-ports
038a579 [Andrew Ash] Trust the server start function to report the port the 
service started on
7c5bdc4 [Andrew Ash] Fix style issue
0347aef [Andrew Ash] Unify port fallback logic to a single place
24a4c32 [Andrew Ash] Remove type on val to match surrounding style
9e4ad96 [Andrew Ash] Reformat for style checker
5d84e0e [Andrew Ash] Document new port configuration options
066dc7a [Andrew Ash] Fix up HttpServer port increments
cad16da [Andrew Ash] Add fallover increment logic for HttpServer
c5a0568 [Andrew Ash] Fix ConnectionManager to retry with increment
b80d2fd [Andrew Ash] Make Spark's block manager port configurable
17c79bb [Andrew Ash] Add a configuration option for spark-shell's class server
f34115d [Andrew Ash] SPARK-1176 Add port configuration for HttpBroadcast
49ee29b [Andrew Ash] SPARK-1174 Add port configuration for HttpFileServer
1c0981a [Andrew Ash] Make port in HttpServer configurable


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/09f7e458
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/09f7e458
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/09f7e458

Branch: refs/heads/master
Commit: 09f7e4587bbdf74207d2629e8c1314f93d865999
Parents: ee7f308
Author: Andrew Or <andrewo...@gmail.com>
Authored: Wed Aug 6 00:07:40 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Wed Aug 6 00:07:40 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/HttpFileServer.scala |   7 +-
 .../scala/org/apache/spark/HttpServer.scala     |  88 ++++++++-----
 .../main/scala/org/apache/spark/SparkConf.scala |  10 +-
 .../main/scala/org/apache/spark/SparkEnv.scala  |  12 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |   3 +-
 .../scala/org/apache/spark/deploy/Client.scala  |   2 -
 .../spark/deploy/master/ui/MasterWebUI.scala    |   2 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |   3 +-
 .../executor/CoarseGrainedExecutorBackend.scala |   5 +-
 .../spark/network/ConnectionManager.scala       |  14 +-
 .../org/apache/spark/storage/BlockManager.scala |   4 +-
 .../scala/org/apache/spark/ui/JettyUtils.scala  |  26 ++--
 .../scala/org/apache/spark/ui/SparkUI.scala     |   2 +-
 .../main/scala/org/apache/spark/ui/WebUI.scala  |   5 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala |  24 +++-
 .../scala/org/apache/spark/util/Utils.scala     |  73 ++++++++++-
 .../org/apache/spark/util/UtilsSuite.scala      |  34 ++++-
 docs/configuration.md                           |  46 +++++++
 docs/security.md                                | 131 ++++++++++++++++++-
 docs/spark-standalone.md                        |  92 +------------
 project/SparkBuild.scala                        |   2 +
 .../org/apache/spark/repl/SparkIMain.scala      |   3 +-
 22 files changed, 416 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/HttpFileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala 
b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 0e3750f..edc3889 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -23,7 +23,10 @@ import com.google.common.io.Files
 
 import org.apache.spark.util.Utils
 
-private[spark] class HttpFileServer(securityManager: SecurityManager) extends 
Logging {
+private[spark] class HttpFileServer(
+    securityManager: SecurityManager,
+    requestedPort: Int = 0)
+  extends Logging {
 
   var baseDir : File = null
   var fileDir : File = null
@@ -38,7 +41,7 @@ private[spark] class HttpFileServer(securityManager: 
SecurityManager) extends Lo
     fileDir.mkdir()
     jarDir.mkdir()
     logInfo("HTTP File server directory is " + baseDir)
-    httpServer = new HttpServer(baseDir, securityManager)
+    httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP 
file server")
     httpServer.start()
     serverUri = httpServer.uri
     logDebug("HTTP file server started at: " + serverUri)

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/HttpServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala 
b/core/src/main/scala/org/apache/spark/HttpServer.scala
index 7e9b517..912558d 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -21,7 +21,7 @@ import java.io.File
 
 import org.eclipse.jetty.util.security.{Constraint, Password}
 import org.eclipse.jetty.security.authentication.DigestAuthenticator
-import org.eclipse.jetty.security.{ConstraintMapping, 
ConstraintSecurityHandler, HashLoginService, SecurityHandler}
+import org.eclipse.jetty.security.{ConstraintMapping, 
ConstraintSecurityHandler, HashLoginService}
 
 import org.eclipse.jetty.server.Server
 import org.eclipse.jetty.server.bio.SocketConnector
@@ -41,49 +41,69 @@ private[spark] class ServerStateException(message: String) 
extends Exception(mes
  * as well as classes created by the interpreter when the user types in code. 
This is just a wrapper
  * around a Jetty server.
  */
-private[spark] class HttpServer(resourceBase: File, securityManager: 
SecurityManager)
-    extends Logging {
+private[spark] class HttpServer(
+    resourceBase: File,
+    securityManager: SecurityManager,
+    requestedPort: Int = 0,
+    serverName: String = "HTTP server")
+  extends Logging {
+
   private var server: Server = null
-  private var port: Int = -1
+  private var port: Int = requestedPort
 
   def start() {
     if (server != null) {
       throw new ServerStateException("Server is already started")
     } else {
       logInfo("Starting HTTP Server")
-      server = new Server()
-      val connector = new SocketConnector
-      connector.setMaxIdleTime(60*1000)
-      connector.setSoLingerTime(-1)
-      connector.setPort(0)
-      server.addConnector(connector)
-
-      val threadPool = new QueuedThreadPool
-      threadPool.setDaemon(true)
-      server.setThreadPool(threadPool)
-      val resHandler = new ResourceHandler
-      resHandler.setResourceBase(resourceBase.getAbsolutePath)
-
-      val handlerList = new HandlerList
-      handlerList.setHandlers(Array(resHandler, new DefaultHandler))
-
-      if (securityManager.isAuthenticationEnabled()) {
-        logDebug("HttpServer is using security")
-        val sh = setupSecurityHandler(securityManager)
-        // make sure we go through security handler to get resources
-        sh.setHandler(handlerList)
-        server.setHandler(sh)
-      } else {
-        logDebug("HttpServer is not using security")
-        server.setHandler(handlerList)
-      }
-
-      server.start()
-      port = server.getConnectors()(0).getLocalPort()
+      val (actualServer, actualPort) =
+        Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
+      server = actualServer
+      port = actualPort
     }
   }
 
   /**
+   * Actually start the HTTP server on the given port.
+   *
+   * Note that this is only best effort in the sense that we may end up 
binding to a nearby port
+   * in the event of port collision. Return the bound server and the actual 
port used.
+   */
+  private def doStart(startPort: Int): (Server, Int) = {
+    val server = new Server()
+    val connector = new SocketConnector
+    connector.setMaxIdleTime(60 * 1000)
+    connector.setSoLingerTime(-1)
+    connector.setPort(startPort)
+    server.addConnector(connector)
+
+    val threadPool = new QueuedThreadPool
+    threadPool.setDaemon(true)
+    server.setThreadPool(threadPool)
+    val resHandler = new ResourceHandler
+    resHandler.setResourceBase(resourceBase.getAbsolutePath)
+
+    val handlerList = new HandlerList
+    handlerList.setHandlers(Array(resHandler, new DefaultHandler))
+
+    if (securityManager.isAuthenticationEnabled()) {
+      logDebug("HttpServer is using security")
+      val sh = setupSecurityHandler(securityManager)
+      // make sure we go through security handler to get resources
+      sh.setHandler(handlerList)
+      server.setHandler(sh)
+    } else {
+      logDebug("HttpServer is not using security")
+      server.setHandler(handlerList)
+    }
+
+    server.start()
+    val actualPort = server.getConnectors()(0).getLocalPort
+
+    (server, actualPort)
+  }
+
+  /**
    * Setup Jetty to the HashLoginService using a single user with our
    * shared secret. Configure it to use DIGEST-MD5 authentication so that the 
password
    * isn't passed in plaintext.
@@ -134,7 +154,7 @@ private[spark] class HttpServer(resourceBase: File, 
securityManager: SecurityMan
     if (server == null) {
       throw new ServerStateException("Server is not started")
     } else {
-      return "http://"; + Utils.localIpAddress + ":" + port
+      "http://"; + Utils.localIpAddress + ":" + port
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index cce7a23..13f0bff 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -323,6 +323,14 @@ private[spark] object SparkConf {
    * the scheduler, while the rest of the spark configs can be inherited from 
the driver later.
    */
   def isExecutorStartupConf(name: String): Boolean = {
-    isAkkaConf(name) || name.startsWith("spark.akka") || 
name.startsWith("spark.auth")
+    isAkkaConf(name) ||
+    name.startsWith("spark.akka") ||
+    name.startsWith("spark.auth") ||
+    isSparkPortConf(name)
   }
+
+  /**
+   * Return whether the given config is a Spark port config.
+   */
+  def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && 
name.endsWith(".port")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index dd8e4ac..9d4edeb 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -22,7 +22,6 @@ import java.net.Socket
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
-import scala.concurrent.Await
 import scala.util.Properties
 
 import akka.actor._
@@ -151,10 +150,10 @@ object SparkEnv extends Logging {
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", 
hostname, port, conf = conf,
       securityManager = securityManager)
 
-    // Bit of a hack: If this is the driver and our port was 0 (meaning bind 
to any free port),
-    // figure out which port number Akka actually bound to and set 
spark.driver.port to it.
-    if (isDriver && port == 0) {
-      conf.set("spark.driver.port",  boundPort.toString)
+    // Figure out which port Akka actually bound to in case the original port 
is 0 or occupied.
+    // This is so that we tell the executors the correct port to connect to.
+    if (isDriver) {
+      conf.set("spark.driver.port", boundPort.toString)
     }
 
     // Create an instance of the class named by the given Java system 
property, or by
@@ -222,7 +221,8 @@ object SparkEnv extends Logging {
 
     val httpFileServer =
       if (isDriver) {
-        val server = new HttpFileServer(securityManager)
+        val fileServerPort = conf.getInt("spark.fileserver.port", 0)
+        val server = new HttpFileServer(securityManager, fileServerPort)
         server.initialize()
         conf.set("spark.fileserver.uri",  server.serverUri)
         server

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 4874564..942dc7d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -152,7 +152,8 @@ private[broadcast] object HttpBroadcast extends Logging {
 
   private def createServer(conf: SparkConf) {
     broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
-    server = new HttpServer(broadcastDir, securityManager)
+    val broadcastPort = conf.getInt("spark.broadcast.port", 0)
+    server = new HttpServer(broadcastDir, securityManager, broadcastPort, 
"HTTP broadcast server")
     server.start()
     serverUri = server.uri
     logInfo("Broadcast server started at " + serverUri)

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 17c507a..c070037 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -155,8 +155,6 @@ object Client {
     conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", 
"WARNING"))
     Logger.getRootLogger.setLevel(driverArgs.logLevel)
 
-    // TODO: See if we can initialize akka so return messages are sent back 
using the same TCP
-    //       flow. Else, this (sadly) requires the DriverClient be routable 
from the Master.
     val (actorSystem, _) = AkkaUtils.createActorSystem(
       "driverClient", Utils.localHostName(), 0, conf, new 
SecurityManager(conf))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 16aa049..d86ec1e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.AkkaUtils
  */
 private[spark]
 class MasterWebUI(val master: Master, requestedPort: Int)
-  extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {
+  extends WebUI(master.securityMgr, requestedPort, master.conf, name = 
"MasterUI") with Logging {
 
   val masterActorRef = master.self
   val timeout = AkkaUtils.askTimeout(master.conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index a9f531e..47fbda6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -22,6 +22,7 @@ import javax.servlet.http.HttpServletRequest
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.worker.Worker
+import org.apache.spark.deploy.worker.ui.WorkerWebUI._
 import org.apache.spark.ui.{SparkUI, WebUI}
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.AkkaUtils
@@ -34,7 +35,7 @@ class WorkerWebUI(
     val worker: Worker,
     val workDir: File,
     port: Option[Int] = None)
-  extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), 
worker.conf)
+  extends WebUI(worker.securityMgr, getUIPort(port, worker.conf), worker.conf, 
name = "WorkerUI")
   with Logging {
 
   val timeout = AkkaUtils.askTimeout(worker.conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index af736de..1f46a0f 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -115,8 +115,9 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
 
       // Bootstrap to fetch the driver's Spark properties.
       val executorConf = new SparkConf
+      val port = executorConf.getInt("spark.executor.port", 0)
       val (fetcher, _) = AkkaUtils.createActorSystem(
-        "driverPropsFetcher", hostname, 0, executorConf, new 
SecurityManager(executorConf))
+        "driverPropsFetcher", hostname, port, executorConf, new 
SecurityManager(executorConf))
       val driver = fetcher.actorSelection(driverUrl)
       val timeout = AkkaUtils.askTimeout(executorConf)
       val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
@@ -126,7 +127,7 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       // Create a new ActorSystem using driver's Spark properties to run the 
backend.
       val driverConf = new SparkConf().setAll(props)
       val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
-        "sparkExecutor", hostname, 0, driverConf, new 
SecurityManager(driverConf))
+        "sparkExecutor", hostname, port, driverConf, new 
SecurityManager(driverConf))
       // set it
       val sparkHostPort = hostname + ":" + boundPort
       actorSystem.actorOf(

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala 
b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 566e8a4..4c00225 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -38,8 +38,12 @@ import scala.language.postfixOps
 import org.apache.spark._
 import org.apache.spark.util.{SystemClock, Utils}
 
-private[spark] class ConnectionManager(port: Int, conf: SparkConf,
-    securityManager: SecurityManager) extends Logging {
+private[spark] class ConnectionManager(
+    port: Int,
+    conf: SparkConf,
+    securityManager: SecurityManager,
+    name: String = "Connection manager")
+  extends Logging {
 
   class MessageStatus(
       val message: Message,
@@ -105,7 +109,11 @@ private[spark] class ConnectionManager(port: Int, conf: 
SparkConf,
   serverChannel.socket.setReuseAddress(true)
   serverChannel.socket.setReceiveBufferSize(256 * 1024)
 
-  serverChannel.socket.bind(new InetSocketAddress(port))
+  private def startService(port: Int): (ServerSocketChannel, Int) = {
+    serverChannel.socket.bind(new InetSocketAddress(port))
+    (serverChannel, serverChannel.socket.getLocalPort)
+  }
+  Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
   serverChannel.register(selector, SelectionKey.OP_ACCEPT)
 
   val id = new ConnectionManagerId(Utils.localHostName, 
serverChannel.socket.getLocalPort)

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c0a0601..3876cf4 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -60,10 +60,12 @@ private[spark] class BlockManager(
     mapOutputTracker: MapOutputTracker)
   extends Logging {
 
+  private val port = conf.getInt("spark.blockManager.port", 0)
   val shuffleBlockManager = new ShuffleBlockManager(this)
   val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
     conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
-  val connectionManager = new ConnectionManager(0, conf, securityManager)
+  val connectionManager =
+    new ConnectionManager(port, conf, securityManager, "Connection manager for 
block manager")
 
   implicit val futureExecContext = connectionManager.futureExecContext
 

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index a2535e3..29e9cf9 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -174,40 +174,32 @@ private[spark] object JettyUtils extends Logging {
       hostName: String,
       port: Int,
       handlers: Seq[ServletContextHandler],
-      conf: SparkConf): ServerInfo = {
+      conf: SparkConf,
+      serverName: String = ""): ServerInfo = {
 
     val collection = new ContextHandlerCollection
     collection.setHandlers(handlers.toArray)
     addFilters(handlers, conf)
 
-    @tailrec
+    // Bind to the given port, or throw a java.net.BindException if the port 
is occupied
     def connect(currentPort: Int): (Server, Int) = {
       val server = new Server(new InetSocketAddress(hostName, currentPort))
       val pool = new QueuedThreadPool
       pool.setDaemon(true)
       server.setThreadPool(pool)
       server.setHandler(collection)
-
-      Try {
+      try {
         server.start()
-      } match {
-        case s: Success[_] =>
-          (server, server.getConnectors.head.getLocalPort)
-        case f: Failure[_] =>
-          val nextPort = (currentPort + 1) % 65536
+        (server, server.getConnectors.head.getLocalPort)
+      } catch {
+        case e: Exception =>
           server.stop()
           pool.stop()
-          val msg = s"Failed to create UI on port $currentPort. Trying again 
on port $nextPort."
-          if (f.toString.contains("Address already in use")) {
-            logWarning(s"$msg - $f")
-          } else {
-            logError(msg, f.exception)
-          }
-          connect(nextPort)
+          throw e
       }
     }
 
-    val (server, boundPort) = connect(port)
+    val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, 
serverName)
     ServerInfo(server, boundPort, collection)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala 
b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 097a1b8..6c788a3 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -36,7 +36,7 @@ private[spark] class SparkUI(
     val listenerBus: SparkListenerBus,
     var appName: String,
     val basePath: String = "")
-  extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath)
+  extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, 
"SparkUI")
   with Logging {
 
   def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, 
sc.listenerBus, sc.appName)

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala 
b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 856273e..5f52f95 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -39,7 +39,8 @@ private[spark] abstract class WebUI(
     securityManager: SecurityManager,
     port: Int,
     conf: SparkConf,
-    basePath: String = "")
+    basePath: String = "",
+    name: String = "")
   extends Logging {
 
   protected val tabs = ArrayBuffer[WebUITab]()
@@ -97,7 +98,7 @@ private[spark] abstract class WebUI(
   def bind() {
     assert(!serverInfo.isDefined, "Attempted to bind %s more than 
once!".format(className))
     try {
-      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
+      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, 
name))
       logInfo("Started %s at http://%s:%d".format(className, publicHostName, 
boundPort))
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala 
b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index feafd65..d6afb73 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConversions.mapAsJavaMap
 import scala.concurrent.Await
 import scala.concurrent.duration.{Duration, FiniteDuration}
 
-import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem}
+import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
 import akka.pattern.ask
 
 import com.typesafe.config.ConfigFactory
@@ -44,14 +44,28 @@ private[spark] object AkkaUtils extends Logging {
    * If indestructible is set to true, the Actor System will continue running 
in the event
    * of a fatal exception. This is used by 
[[org.apache.spark.executor.Executor]].
    */
-  def createActorSystem(name: String, host: String, port: Int,
-    conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = {
+  def createActorSystem(
+      name: String,
+      host: String,
+      port: Int,
+      conf: SparkConf,
+      securityManager: SecurityManager): (ActorSystem, Int) = {
+    val startService: Int => (ActorSystem, Int) = { actualPort =>
+      doCreateActorSystem(name, host, actualPort, conf, securityManager)
+    }
+    Utils.startServiceOnPort(port, startService, name)
+  }
+
+  private def doCreateActorSystem(
+      name: String,
+      host: String,
+      port: Int,
+      conf: SparkConf,
+      securityManager: SecurityManager): (ActorSystem, Int) = {
 
     val akkaThreads   = conf.getInt("spark.akka.threads", 4)
     val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
-
     val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
-
     val akkaFrameSize = maxFrameSizeBytes(conf)
     val akkaLogLifecycleEvents = 
conf.getBoolean("spark.akka.logLifecycleEvents", false)
     val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 30073a8..c60be4f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.util
 
 import java.io._
-import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, 
URLConnection}
+import java.net._
 import java.nio.ByteBuffer
 import java.util.{Locale, Random, UUID}
 import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, 
ThreadPoolExecutor}
@@ -1331,4 +1331,75 @@ private[spark] object Utils extends Logging {
       .map { case (k, v) => s"-D$k=$v" }
   }
 
+  /**
+   * Default number of retries in binding to a port.
+   */
+  val portMaxRetries: Int = {
+    if (sys.props.contains("spark.testing")) {
+      // Set a higher number of retries for tests...
+      sys.props.get("spark.ports.maxRetries").map(_.toInt).getOrElse(100)
+    } else {
+      Option(SparkEnv.get)
+        .flatMap(_.conf.getOption("spark.ports.maxRetries"))
+        .map(_.toInt)
+        .getOrElse(16)
+    }
+  }
+
+  /**
+   * Attempt to start a service on the given port, or fail after a number of 
attempts.
+   * Each subsequent attempt uses 1 + the port used in the previous attempt 
(unless the port is 0).
+   *
+   * @param startPort The initial port to start the service on.
+   * @param maxRetries Maximum number of retries to attempt.
+   *                   A value of 3 means attempting ports n, n+1, n+2, and 
n+3, for example.
+   * @param startService Function to start service on a given port.
+   *                     This is expected to throw java.net.BindException on 
port collision.
+   */
+  def startServiceOnPort[T](
+      startPort: Int,
+      startService: Int => (T, Int),
+      serviceName: String = "",
+      maxRetries: Int = portMaxRetries): (T, Int) = {
+    val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+    for (offset <- 0 to maxRetries) {
+      // Do not increment port if startPort is 0, which is treated as a 
special port
+      val tryPort = if (startPort == 0) startPort else (startPort + offset) % 
65536
+      try {
+        val (service, port) = startService(tryPort)
+        logInfo(s"Successfully started service$serviceString on port $port.")
+        return (service, port)
+      } catch {
+        case e: Exception if isBindCollision(e) =>
+          if (offset >= maxRetries) {
+            val exceptionMessage =
+              s"${e.getMessage}: Service$serviceString failed after 
$maxRetries retries!"
+            val exception = new BindException(exceptionMessage)
+            // restore original stack trace
+            exception.setStackTrace(e.getStackTrace)
+            throw exception
+          }
+          logWarning(s"Service$serviceString could not bind on port $tryPort. 
" +
+            s"Attempting port ${tryPort + 1}.")
+      }
+    }
+    // Should never happen
+    throw new SparkException(s"Failed to start service$serviceString on port 
$startPort")
+  }
+
+  /**
+   * Return whether the exception is caused by an address-port collision when 
binding.
+   */
+  def isBindCollision(exception: Throwable): Boolean = {
+    exception match {
+      case e: BindException =>
+        if (e.getMessage != null && e.getMessage.contains("Address already in 
use")) {
+          return true
+        }
+        isBindCollision(e.getCause)
+      case e: Exception => isBindCollision(e.getCause)
+      case _ => false
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 1ee936b..70d423b 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util
 import scala.util.Random
 
 import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, 
FileOutputStream}
-import java.net.URI
+import java.net.{BindException, ServerSocket, URI}
 import java.nio.{ByteBuffer, ByteOrder}
 
 import com.google.common.base.Charsets
@@ -265,4 +265,36 @@ class UtilsSuite extends FunSuite {
       Array("hdfs:/a.jar", "s3:/another.jar"))
   }
 
+  test("isBindCollision") {
+    // Negatives
+    assert(!Utils.isBindCollision(null))
+    assert(!Utils.isBindCollision(new Exception))
+    assert(!Utils.isBindCollision(new Exception(new Exception)))
+    assert(!Utils.isBindCollision(new Exception(new BindException)))
+    assert(!Utils.isBindCollision(new Exception(new BindException("Random 
message"))))
+
+    // Positives
+    val be = new BindException("Address already in use")
+    val be1 = new Exception(new BindException("Address already in use"))
+    val be2 = new Exception(new Exception(new BindException("Address already 
in use")))
+    assert(Utils.isBindCollision(be))
+    assert(Utils.isBindCollision(be1))
+    assert(Utils.isBindCollision(be2))
+
+    // Actual bind exception
+    var server1: ServerSocket = null
+    var server2: ServerSocket = null
+    try {
+      server1 = new java.net.ServerSocket(0)
+      server2 = new java.net.ServerSocket(server1.getLocalPort)
+    } catch {
+      case e: Exception =>
+        assert(e.isInstanceOf[java.net.BindException])
+        assert(Utils.isBindCollision(e))
+    } finally {
+      Option(server1).foreach(_.close())
+      Option(server2).foreach(_.close())
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 7cd7f41..5e3eb0f 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -566,6 +566,7 @@ Apart from these, the following properties are also 
available, and may be useful
   <td>(local hostname)</td>
   <td>
     Hostname or IP address for the driver to listen on.
+    This is used for communicating with the executors and the standalone 
Master.
   </td>
 </tr>
 <tr>
@@ -573,6 +574,51 @@ Apart from these, the following properties are also 
available, and may be useful
   <td>(random)</td>
   <td>
     Port for the driver to listen on.
+    This is used for communicating with the executors and the standalone 
Master.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.fileserver.port</code></td>
+  <td>(random)</td>
+  <td>
+    Port for the driver's HTTP file server to listen on.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.broadcast.port</code></td>
+  <td>(random)</td>
+  <td>
+    Port for the driver's HTTP broadcast server to listen on.
+    This is not relevant for torrent broadcast.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.replClassServer.port</code></td>
+  <td>(random)</td>
+  <td>
+    Port for the driver's HTTP class server to listen on.
+    This is only relevant for the Spark shell.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.blockManager.port</code></td>
+  <td>(random)</td>
+  <td>
+    Port for all block managers to listen on. These exist on both the driver 
and the executors.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.executor.port</code></td>
+  <td>(random)</td>
+  <td>
+    Port for the executor to listen on. This is used for communicating with 
the driver.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.port.maxRetries</code></td>
+  <td>16</td>
+  <td>
+    Maximum number of retries when binding to a port before giving up.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/docs/security.md
----------------------------------------------------------------------
diff --git a/docs/security.md b/docs/security.md
index 8312f8d..ec05231 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -7,6 +7,9 @@ Spark currently supports authentication via a shared secret. 
Authentication can
 
 * For Spark on [YARN](running-on-yarn.html) deployments, configuring 
`spark.authenticate` to `true` will automatically handle generating and 
distributing the shared secret. Each application will use a unique shared 
secret. 
 * For other types of Spark deployments, the Spark parameter 
`spark.authenticate.secret` should be configured on each of the nodes. This 
secret will be used by all the Master/Workers and applications.
+* **IMPORTANT NOTE:** *The experimental Netty shuffle path 
(`spark.shuffle.use.netty`) is not secured, so do not use Netty for shuffles if 
running with authentication.*
+
+## Web UI
 
 The Spark UI can also be secured by using [javax servlet 
filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the 
`spark.ui.filters` setting. A user may want to secure the UI if it has data 
that other users should not be allowed to see. The javax servlet filter 
specified by the user can authenticate the user and then once the user is 
logged in, Spark can compare that user versus the view ACLs to make sure they 
are authorized to view the UI. The configs `spark.acls.enable` and 
`spark.ui.view.acls` control the behavior of the ACLs. Note that the user who 
started the application always has view access to the UI.  On YARN, the Spark 
UI uses the standard YARN web application proxy mechanism and will authenticate 
via any installed Hadoop filters.
 
@@ -14,10 +17,132 @@ Spark also supports modify ACLs to control who has access 
to modify a running Sp
 
 Spark allows for a set of administrators to be specified in the acls who 
always have view and modify permissions to all the applications. is controlled 
by the config `spark.admin.acls`. This is useful on a shared cluster where you 
might have administrators or support staff who help users debug applications.
 
+## Event Logging
+
 If your applications are using event logging, the directory where the event 
logs go (`spark.eventLog.dir`) should be manually created and have the proper 
permissions set on it. If you want those log files secured, the permissions 
should be set to `drwxrwxrwxt` for that directory. The owner of the directory 
should be the super user who is running the history server and the group 
permissions should be restricted to super user group. This will allow all users 
to write to the directory but will prevent unprivileged users from removing or 
renaming a file unless they own the file or directory. The event log files will 
be created by Spark with permissions such that only the user and group have 
read and write access.
 
-**IMPORTANT NOTE:** *The experimental Netty shuffle path 
(`spark.shuffle.use.netty`) is not secured, so do not use Netty for shuffles if 
running with authentication.*
+## Configuring Ports for Network Security
+
+Spark makes heavy use of the network, and some environments have strict 
requirements for using tight
+firewall settings.  Below are the primary ports that Spark uses for its 
communication and how to
+configure those ports.
+
+### Standalone mode only
+
+<table class="table">
+  <tr>
+    <th>From</th><th>To</th><th>Default 
Port</th><th>Purpose</th><th>Configuration
+    Setting</th><th>Notes</th>
+  </tr>
+  <tr>
+    <td>Browser</td>
+    <td>Standalone Master</td>
+    <td>8080</td>
+    <td>Web UI</td>
+    <td><code>spark.master.ui.port /<br> SPARK_MASTER_WEBUI_PORT</code></td>
+    <td>Jetty-based. Standalone mode only.</td>
+  </tr>
+  <tr>
+    <td>Browser</td>
+    <td>Standalone Worker</td>
+    <td>8081</td>
+    <td>Web UI</td>
+    <td><code>spark.worker.ui.port /<br> SPARK_WORKER_WEBUI_PORT</code></td>
+    <td>Jetty-based. Standalone mode only.</td>
+  </tr>
+  <tr>
+    <td>Driver /<br> Standalone Worker</td>
+    <td>Standalone Master</td>
+    <td>7077</td>
+    <td>Submit job to cluster /<br> Join cluster</td>
+    <td><code>SPARK_MASTER_PORT</code></td>
+    <td>Akka-based. Set to "0" to choose a port randomly. Standalone mode 
only.</td>
+  </tr>
+  <tr>
+    <td>Standalone Master</td>
+    <td>Standalone Worker</td>
+    <td>(random)</td>
+    <td>Schedule executors</td>
+    <td><code>SPARK_WORKER_PORT</code></td>
+    <td>Akka-based. Set to "0" to choose a port randomly. Standalone mode 
only.</td>
+  </tr>
+</table>
+
+### All cluster managers
+
+<table class="table">
+  <tr>
+    <th>From</th><th>To</th><th>Default 
Port</th><th>Purpose</th><th>Configuration
+    Setting</th><th>Notes</th>
+  </tr>
+  <tr>
+    <td>Browser</td>
+    <td>Application</td>
+    <td>4040</td>
+    <td>Web UI</td>
+    <td><code>spark.ui.port</code></td>
+    <td>Jetty-based</td>
+  </tr>
+  <tr>
+    <td>Browser</td>
+    <td>History Server</td>
+    <td>18080</td>
+    <td>Web UI</td>
+    <td><code>spark.history.ui.port</code></td>
+    <td>Jetty-based</td>
+  </tr>
+  <tr>
+    <td>Executor /<br> Standalone Master</td>
+    <td>Driver</td>
+    <td>(random)</td>
+    <td>Connect to application /<br> Notify executor state changes</td>
+    <td><code>spark.driver.port</code></td>
+    <td>Akka-based. Set to "0" to choose a port randomly.</td>
+  </tr>
+  <tr>
+    <td>Driver</td>
+    <td>Executor</td>
+    <td>(random)</td>
+    <td>Schedule tasks</td>
+    <td><code>spark.executor.port</code></td>
+    <td>Akka-based. Set to "0" to choose a port randomly.</td>
+  </tr>
+  <tr>
+    <td>Executor</td>
+    <td>Driver</td>
+    <td>(random)</td>
+    <td>File server for files and jars</td>
+    <td><code>spark.fileserver.port</code></td>
+    <td>Jetty-based</td>
+  </tr>
+  <tr>
+    <td>Executor</td>
+    <td>Driver</td>
+    <td>(random)</td>
+    <td>HTTP Broadcast</td>
+    <td><code>spark.broadcast.port</code></td>
+    <td>Jetty-based. Not used by TorrentBroadcast, which sends data through 
the block manager
+    instead.</td>
+  </tr>
+  <tr>
+    <td>Executor</td>
+    <td>Driver</td>
+    <td>(random)</td>
+    <td>Class file server</td>
+    <td><code>spark.replClassServer.port</code></td>
+    <td>Jetty-based. Only used in Spark shells.</td>
+  </tr>
+  <tr>
+    <td>Executor / Driver</td>
+    <td>Executor / Driver</td>
+    <td>(random)</td>
+    <td>Block Manager port</td>
+    <td><code>spark.blockManager.port</code></td>
+    <td>Raw socket via ServerSocketChannel</td>
+  </tr>
+</table>
 
-See the [configuration page](configuration.html) for more details on the 
security configuration parameters.
 
-See <a 
href="{{site.SPARK_GITHUB_URL}}/tree/master/core/src/main/scala/org/apache/spark/SecurityManager.scala"><code>org.apache.spark.SecurityManager</code></a>
 for implementation details about security.
+See the [configuration page](configuration.html) for more details on the 
security configuration
+parameters, and <a 
href="{{site.SPARK_GITHUB_URL}}/tree/master/core/src/main/scala/org/apache/spark/SecurityManager.scala">
+<code>org.apache.spark.SecurityManager</code></a> for implementation details 
about security.

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/docs/spark-standalone.md
----------------------------------------------------------------------
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 293a7ac..c791c81 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -299,97 +299,15 @@ You can run Spark alongside your existing Hadoop cluster 
by just launching it as
 
 # Configuring Ports for Network Security
 
-Spark makes heavy use of the network, and some environments have strict 
requirements for using tight
-firewall settings.  Below are the primary ports that Spark uses for its 
communication and how to
-configure those ports.
-
-<table class="table">
-  <tr>
-    <th>From</th><th>To</th><th>Default 
Port</th><th>Purpose</th><th>Configuration
-    Setting</th><th>Notes</th>
-  </tr>
-  <!-- Web UIs -->
-  <tr>
-    <td>Browser</td>
-    <td>Standalone Cluster Master</td>
-    <td>8080</td>
-    <td>Web UI</td>
-    <td><code>spark.master.ui.port</code></td>
-    <td>Jetty-based</td>
-  </tr>
-  <tr>
-    <td>Browser</td>
-    <td>Driver</td>
-    <td>4040</td>
-    <td>Web UI</td>
-    <td><code>spark.ui.port</code></td>
-    <td>Jetty-based</td>
-  </tr>
-  <tr>
-    <td>Browser</td>
-    <td>History Server</td>
-    <td>18080</td>
-    <td>Web UI</td>
-    <td><code>spark.history.ui.port</code></td>
-    <td>Jetty-based</td>
-  </tr>
-  <tr>
-    <td>Browser</td>
-    <td>Worker</td>
-    <td>8081</td>
-    <td>Web UI</td>
-    <td><code>spark.worker.ui.port</code></td>
-    <td>Jetty-based</td>
-  </tr>
-  <!-- Cluster interactions -->
-  <tr>
-    <td>Application</td>
-    <td>Standalone Cluster Master</td>
-    <td>7077</td>
-    <td>Submit job to cluster</td>
-    <td><code>spark.driver.port</code></td>
-    <td>Akka-based.  Set to "0" to choose a port randomly</td>
-  </tr>
-  <tr>
-    <td>Worker</td>
-    <td>Standalone Cluster Master</td>
-    <td>7077</td>
-    <td>Join cluster</td>
-    <td><code>spark.driver.port</code></td>
-    <td>Akka-based.  Set to "0" to choose a port randomly</td>
-  </tr>
-  <tr>
-    <td>Application</td>
-    <td>Worker</td>
-    <td>(random)</td>
-    <td>Join cluster</td>
-    <td><code>SPARK_WORKER_PORT</code> (standalone cluster)</td>
-    <td>Akka-based</td>
-  </tr>
-
-  <!-- Other misc stuff -->
-  <tr>
-    <td>Driver and other Workers</td>
-    <td>Worker</td>
-    <td>(random)</td>
-    <td>
-      <ul>
-        <li>File server for file and jars</li>
-        <li>Http Broadcast</li>
-        <li>Class file server (Spark Shell only)</li>
-      </ul>
-    </td>
-    <td>None</td>
-    <td>Jetty-based.  Each of these services starts on a random port that 
cannot be configured</td>
-  </tr>
-
-</table>
+Spark makes heavy use of the network, and some environments have strict 
requirements for using
+tight firewall settings. For a complete list of ports to configure, see the
+[security page](security.html#configuring-ports-for-network-security).
 
 # High Availability
 
 By default, standalone scheduling clusters are resilient to Worker failures 
(insofar as Spark itself is resilient to losing work by moving it to other 
workers). However, the scheduler uses a Master to make scheduling decisions, 
and this (by default) creates a single point of failure: if the Master crashes, 
no new applications can be created. In order to circumvent this, we have two 
high availability schemes, detailed below.
 
-## Standby Masters with ZooKeeper
+# Standby Masters with ZooKeeper
 
 **Overview**
 
@@ -429,7 +347,7 @@ There's an important distinction to be made between 
"registering with a Master"
 
 Due to this property, new Masters can be created at any time, and the only 
thing you need to worry about is that _new_ applications and Workers can find 
it to register with in case it becomes the leader. Once registered, you're 
taken care of.
 
-## Single-Node Recovery with Local File System
+# Single-Node Recovery with Local File System
 
 **Overview**
 

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index aac621f..40b5885 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -330,6 +330,8 @@ object TestSettings {
     fork := true,
     javaOptions in Test += "-Dspark.test.home=" + sparkHome,
     javaOptions in Test += "-Dspark.testing=1",
+    javaOptions in Test += "-Dspark.ports.maxRetries=100",
+    javaOptions in Test += "-Dspark.ui.port=0",
     javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
     javaOptions in Test ++= System.getProperties.filter(_._1 startsWith 
"spark")
       .map { case (k,v) => s"-D$k=$v" }.toSeq,

http://git-wip-us.apache.org/repos/asf/spark/blob/09f7e458/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala 
b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index f60bbb4..84b57cd 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -102,7 +102,8 @@ import org.apache.spark.util.Utils
 
     val virtualDirectory                              = new 
PlainFile(outputDir) // "directory" for classfiles
     /** Jetty server that will serve our classes to worker nodes */
-    val classServer                                   = new 
HttpServer(outputDir, new SecurityManager(conf))
+    val classServerPort                               = 
conf.getInt("spark.replClassServer.port", 0)
+    val classServer                                   = new 
HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class 
server")
     private var currentSettings: Settings             = initialSettings
     var printResults                                  = true      // whether 
to print result lines
     var totalSilence                                  = false     // whether 
to print anything


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to