Repository: incubator-samza Updated Branches: refs/heads/master 891a6f8ef -> bfd97fce0
SAMZA-175: Bind AM Jetty server to 0 Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/bfd97fce Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/bfd97fce Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/bfd97fce Branch: refs/heads/master Commit: bfd97fce0719fc662cd456c90c6c01f1a0017c15 Parents: 891a6f8 Author: Zhijie Shen <zshen at hortonworks dot com> Authored: Tue Mar 18 23:29:09 2014 -0700 Committer: Jakob Homan <[email protected]> Committed: Tue Mar 18 23:29:09 2014 -0700 ---------------------------------------------------------------------- .../samza/job/yarn/SamzaAppMasterService.scala | 40 +++++++------------- .../org/apache/samza/webapp/WebAppServer.scala | 19 ++++++++-- .../job/yarn/TestSamzaAppMasterService.scala | 2 + 3 files changed, 31 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bfd97fce/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala index 82d90d4..ab13d43 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala @@ -19,7 +19,6 @@ package org.apache.samza.job.yarn -import org.apache.samza.util.Util import grizzled.slf4j.Logging import org.apache.samza.webapp._ import org.apache.samza.config.Config @@ -36,34 +35,23 @@ class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry var webApp: WebAppServer = null override def onInit() { - // try starting the samza AM dashboard. try ten times, just in case we - // pick a port that's already in use. - for (i <- 0 until 10) { - val rpcPort = Util.randomBetween(10000, 50000) - val trackingPort = Util.randomBetween(10000, 50000) - info("Starting webapp at rpc %d, tracking port %d" format (rpcPort, trackingPort)) + // try starting the samza AM dashboard at a random rpc and tracking port + info("Starting webapp at a random rpc and tracking port") - try { - rpcApp = new WebAppServer("/", rpcPort) - rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry)) - rpcApp.start + rpcApp = new WebAppServer("/") + rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry)) + rpcApp.start - webApp = new WebAppServer("/", trackingPort) - webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state)) - webApp.start + webApp = new WebAppServer("/") + webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state)) + webApp.start - state.rpcPort = rpcPort - state.trackingPort = trackingPort - return - } catch { - case e: Exception => { - warn("Unable to start webapp on rpc port %d, tracking port %d .. retrying" format (rpcPort, trackingPort)) - } - } - } - - if (state.rpcPort == 0 || state.trackingPort == 0) { - throw new SamzaException("Giving up trying to start the webapp, since we keep getting ports that are already in use") + state.rpcPort = rpcApp.port + state.trackingPort = webApp.port + if (state.rpcPort > 0 && state.trackingPort > 0) { + info("Webapp is started at rpc %d, tracking port %d" format (state.rpcPort, state.trackingPort)) + } else { + throw new SamzaException("Unable to start webapp, since the host is out of ports") } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bfd97fce/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala index bb5c297..d524996 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/WebAppServer.scala @@ -20,13 +20,15 @@ package org.apache.samza.webapp import javax.servlet.Servlet -import org.eclipse.jetty.server.Server +import org.eclipse.jetty.server.{ Connector, Server } import org.eclipse.jetty.servlet.{ DefaultServlet, ServletHolder } import org.eclipse.jetty.webapp.WebAppContext +import org.apache.samza.SamzaException -class WebAppServer(rootPath: String, port: Int) { - val server = new Server(port) +class WebAppServer(rootPath: String) { + val server = new Server(0) val context = new WebAppContext + var port: Int = 0 // add a default holder to deal with static files val defaultHolder = new ServletHolder(classOf[DefaultServlet]) @@ -43,9 +45,18 @@ class WebAppServer(rootPath: String, port: Int) { } def start { - context.setContextPath("/"); + context.setContextPath("/") context.setResourceBase(getClass.getClassLoader.getResource("scalate").toExternalForm) server.setHandler(context) server.start + // retrieve the real port + try { + val connector : Connector = server.getConnectors()(0).asInstanceOf[Connector] + port = connector.getLocalPort + } catch { + case e: Throwable => { + throw new SamzaException("Error when getting the port", e) + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bfd97fce/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala index 1099ca3..811c996 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala @@ -33,6 +33,8 @@ class TestSamzaAppMasterService { // start the dashboard service.onInit + assert(state.rpcPort > 0) + assert(state.trackingPort > 0) // check to see if it's running val url = new URL("http://127.0.0.1:%d/am" format state.rpcPort)
