Repository: spark Updated Branches: refs/heads/branch-1.1 35875e9ec -> 0d958f163
[SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA (1.1 vers... ...ion). This is a backport of SPARK-3606 to branch-1.1. Some of the code had to be duplicated since branch-1.1 doesn't have the cleanup work that was done to the Yarn codebase. I don't know whether the version issue in yarn/alpha/pom.xml was intentional, but I couldn't compile the code without fixing it. Author: Marcelo Vanzin <van...@cloudera.com> Closes #2497 from vanzin/SPARK-3606-1.1 and squashes the following commits: 4fd3c27 [Marcelo Vanzin] Remove unused imports. 75cde8c [Marcelo Vanzin] Scala is weird. b27ebda [Marcelo Vanzin] Review feedback. 72ceafb [Marcelo Vanzin] Undelete needed import. 61162a6 [Marcelo Vanzin] Use separate config for each param instead of json. 3b7205f [Marcelo Vanzin] Review feedback. b3b3e50 [Marcelo Vanzin] [SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA (1.1 version). Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d958f16 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d958f16 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d958f16 Branch: refs/heads/branch-1.1 Commit: 0d958f163014e2b612ec445c80dfe69ff29d9f1a Parents: 35875e9 Author: Marcelo Vanzin <van...@cloudera.com> Authored: Fri Oct 17 00:53:15 2014 -0700 Committer: Andrew Or <andrewo...@gmail.com> Committed: Fri Oct 17 00:53:15 2014 -0700 ---------------------------------------------------------------------- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 12 +++-- .../scala/org/apache/spark/ui/JettyUtils.scala | 15 +++--- .../spark/deploy/yarn/ExecutorLauncher.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 12 ++--- .../spark/deploy/yarn/ExecutorLauncher.scala | 6 +-- .../spark/deploy/yarn/YarnStableUtils.scala | 54 ++++++++++++++++++++ 7 files changed, 76 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 6abf6d9..fb8160a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -66,7 +66,7 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage - case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String) + case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase :String) extends CoarseGrainedClusterMessage } http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 04046e2..e8a3a3b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -51,12 +51,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered resources / total expected resources) + // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. var minRegisteredRatio = math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)) // Submit tasks after maxRegisteredWaitingTime milliseconds - // if minRegisteredRatio has not yet been reached + // if minRegisteredRatio has not yet been reached val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000) val createTime = System.currentTimeMillis() @@ -283,15 +283,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } // Add filters to the SparkUI - def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) { + def addWebUIFilter(filterName: String, filterParams: Map[String, String], proxyBase: String) { if (proxyBase != null && proxyBase.nonEmpty) { System.setProperty("spark.ui.proxyBase", proxyBase) } - if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) { + val hasFilter = (filterName != null && filterName.nonEmpty && + filterParams != null && filterParams.nonEmpty) + if (hasFilter) { logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase") conf.set("spark.ui.filters", filterName) - conf.set(s"spark.$filterName.params", filterParams) + filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) } scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 6b46892..6339012 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -21,9 +21,7 @@ import java.net.{InetSocketAddress, URL} import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} -import scala.annotation.tailrec import scala.language.implicitConversions -import scala.util.{Failure, Success, Try} import scala.xml.Node import org.eclipse.jetty.server.Server @@ -148,14 +146,19 @@ private[spark] object JettyUtils extends Logging { holder.setClassName(filter) // Get any parameters for each filter val paramName = "spark." + filter + ".params" - val params = conf.get(paramName, "").split(',').map(_.trim()).toSet - params.foreach { - case param : String => + val params = conf.get(paramName, "").split(',').map(_.trim()).toSet.foreach { + param : String => if (!param.isEmpty) { val parts = param.split("=") if (parts.length == 2) holder.setInitParameter(parts(0), parts(1)) - } + } } + + val prefix = s"spark.$filter.param." + conf.getAll + .filter { case (k, v) => k.length() > prefix.length() && k.startsWith(prefix) } + .foreach { case (k, v) => holder.setInitParameter(k.substring(prefix.length()), v) } + val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR, DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST) handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) } http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala ---------------------------------------------------------------------- diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 155dd88..e9289aa 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -185,7 +185,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val parts = proxy.split(":") val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) val uriBase = "http://" + proxy + proxyBase - val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + val amFilter = Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase) val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase) } http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e4d60c6..378304f 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.hadoop.yarn.webapp.util.WebAppUtils import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil @@ -130,14 +129,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private def addAmIpFilter() { val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" System.setProperty("spark.ui.filters", amFilter) - val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts : Array[String] = proxy.split(":") - val uriBase = "http://" + proxy + - System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) - - val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase - System.setProperty( - "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params) + val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + val params = YarnStableUtils.getAmIpFilterParams(yarnConf, proxyBase) + params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) } } private def registerApplicationMaster(): RegisterApplicationMasterResponse = { http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index e093fe4..38e9f9c 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -33,7 +33,6 @@ import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.hadoop.yarn.webapp.util.WebAppUtils /** * An application master that allocates executors on behalf of a driver that is running outside @@ -144,11 +143,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // add the yarn amIpFilter that Yarn requires for properly securing the UI private def addAmIpFilter() { - val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts = proxy.split(":") val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) - val uriBase = "http://" + proxy + proxyBase - val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + val amFilter = YarnStableUtils.getAmIpFilterParams(yarnConf, proxyBase) val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase) } http://git-wip-us.apache.org/repos/asf/spark/blob/0d958f16/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala new file mode 100644 index 0000000..ea81faf --- /dev/null +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnStableUtils.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ +import scala.util.Try + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.webapp.util.WebAppUtils + +private[yarn] object YarnStableUtils { + + def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = { + // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2, + // so not all stable releases have it. + val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration]) + .invoke(null, conf).asInstanceOf[String]).getOrElse("http://") + + // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses. + try { + val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter", + classOf[Configuration]) + val proxies = method.invoke(null, conf).asInstanceOf[JList[String]] + val hosts = proxies.map { proxy => proxy.split(":")(0) } + val uriBases = proxies.map { proxy => prefix + proxy + proxyBase } + Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) + } catch { + case e: NoSuchMethodException => + val proxy = WebAppUtils.getProxyHostAndPort(conf) + val parts = proxy.split(":") + val uriBase = prefix + proxy + proxyBase + Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase) + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org