Repository: incubator-gearpump Updated Branches: refs/heads/master 73de3ce22 -> 8064313af
[GEARPUMP-355] Fix YarnAppMaster address resolution in a kerberized H⦠â¦adoop/Yarn set-up Author: Timea Magyar <[email protected]> Closes #231 from titikakatoo/yarn_spnego_authentication. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/8064313a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/8064313a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/8064313a Branch: refs/heads/master Commit: 8064313afeee1d966ef033a637cfd58d1cca6617 Parents: 73de3ce Author: Timea Magyar <[email protected]> Authored: Sat Oct 21 07:24:13 2017 +0800 Committer: manuzhang <[email protected]> Committed: Sat Oct 21 07:24:23 2017 +0800 ---------------------------------------------------------------------- .../yarn/appmaster/YarnAppMaster.scala | 23 +------- .../yarn/client/AppMasterResolver.scala | 62 ++++++++++++++------ 2 files changed, 46 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/8064313a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala index 1907a95..53e93f9 100644 --- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala @@ -18,14 +18,10 @@ package org.apache.gearpump.experiments.yarn.appmaster -import java.io.IOException import java.util.concurrent.TimeUnit - import akka.actor._ import akka.util.Timeout import com.typesafe.config.ConfigValueFactory -import org.apache.commons.httpclient.HttpClient -import org.apache.commons.httpclient.methods.GetMethod import org.apache.gearpump.cluster.ClientToMaster._ import org.apache.gearpump.cluster.ClusterConfig import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} @@ -35,7 +31,6 @@ import org.apache.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig import org.apache.gearpump.transport.HostPort import org.apache.gearpump.util._ import org.slf4j.Logger - import scala.collection.JavaConverters._ import scala.concurrent.Await import scala.concurrent.duration.Duration @@ -364,22 +359,8 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser { case class WorkerInfo(id: ContainerId, nodeId: NodeId) def getAppMaster(report: ApplicationReport, system: ActorSystem): ActorRef = { - val client = new HttpClient() - val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path" - val get = new GetMethod(appMasterPath) - var status = client.executeMethod(get) - - if (status != 200) { - // Sleeps a little bit, and try again - Thread.sleep(3000) - status = client.executeMethod(get) - } + import org.apache.gearpump.experiments.yarn.client.AppMasterResolver - if (status == 200) { - AkkaHelper.actorFor(system, get.getResponseBodyAsString) - } else { - throw new IOException("Fail to resolve AppMaster address, please make sure " + - s"${report.getTrackingURL} is accessible...") - } + AppMasterResolver.resolveAppMasterAddress(report, system) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/8064313a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala index 90653e1..c05b4e2 100644 --- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala @@ -19,14 +19,15 @@ package org.apache.gearpump.experiments.yarn.client import java.io.IOException - +import java.net.{HttpURLConnection, URL} +import java.nio.charset.StandardCharsets import akka.actor.{ActorRef, ActorSystem} -import org.apache.commons.httpclient.HttpClient -import org.apache.commons.httpclient.methods.GetMethod -import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId +import org.apache.commons.io.IOUtils +import org.apache.gearpump.experiments.yarn.glue.Records.{ApplicationId, ApplicationReport} import org.apache.gearpump.experiments.yarn.glue.YarnClient import org.apache.gearpump.util.{AkkaHelper, LogUtil} - +import org.apache.hadoop.hdfs.web.URLConnectionFactory +import org.apache.hadoop.yarn.conf.YarnConfiguration import scala.util.Try /** @@ -43,19 +44,8 @@ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) { private def connect(appId: ApplicationId): ActorRef = { val report = yarnClient.getApplicationReport(appId) - val client = new HttpClient() - val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path" - LOG.info(s"appMasterPath=$appMasterPath") - val get = new GetMethod(appMasterPath) - val status = client.executeMethod(get) - if (status == 200) { - val response = get.getResponseBodyAsString - LOG.info("Successfully resolved AppMaster address: " + response) - AkkaHelper.actorFor(system, response) - } else { - throw new IOException("Fail to resolve AppMaster address, please make sure " + - s"${report.getTrackingURL} is accessible...") - } + + AppMasterResolver.resolveAppMasterAddress(report, system) } private def retry(fun: => ActorRef, times: Int): ActorRef = { @@ -75,3 +65,39 @@ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) { result } } + +object AppMasterResolver { + val LOG = LogUtil.getLogger(getClass) + + def resolveAppMasterAddress(report: ApplicationReport, system: ActorSystem): ActorRef = { + val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path" + LOG.info(s"appMasterPath=$appMasterPath") + + val connectionFactory: URLConnectionFactory = URLConnectionFactory + .newDefaultURLConnectionFactory(new YarnConfiguration()) + val url: URL = new URL(appMasterPath) + val connection: HttpURLConnection = connectionFactory.openConnection(url) + .asInstanceOf[HttpURLConnection] + connection.setInstanceFollowRedirects(true) + + try { + connection.connect() + } catch { + case e: IOException => + LOG.error(s"Failed to connect to AppMaster" + e.getMessage) + } + + val status = connection.getResponseCode + if (status == 200) { + val stream: java.io.InputStream = connection.getInputStream + val response = IOUtils.toString(stream, StandardCharsets.UTF_8) + LOG.info("Successfully resolved AppMaster address: " + response) + connection.disconnect() + AkkaHelper.actorFor(system, response) + } else { + connection.disconnect() + throw new IOException("Fail to resolve AppMaster address, please make sure " + + s"${report.getTrackingURL} is accessible...") + } + } +}
