Repository: spark
Updated Branches:
  refs/heads/master e5566e05b -> 30abef154


[SPARK-3606] [yarn] Correctly configure AmIpFilter for Yarn HA.

The existing code only considered one of the RMs when running in
Yarn HA mode, so it was possible to get errors if the active RM
was not registered in the filter.

The change makes use of a new API added to Yarn that returns all
proxy addresses, and falls back to the old behavior if the API
is not present. While there, I also made a change to look for the
scheme (http or https) being used by Yarn when building the proxy
URIs.

Since, in the case of multiple RMs, Yarn uses commas as a separator,
it was not possible anymore to use spark.filter.params to propagate
this information (which used commas to delimit different config params).
Instead, I added a new param (spark.filter.jsonParams) which expects
a JSON string containing a map with the config data. I chose not to
add it to the documentation at this point since I don't believe users
will use it directly.

Author: Marcelo Vanzin <[email protected]>

Closes #2469 from vanzin/SPARK-3606 and squashes the following commits:

aeb458a [Marcelo Vanzin] Undelete needed import.
65e400d [Marcelo Vanzin] Remove unused import.
d121883 [Marcelo Vanzin] Use separate config for each param instead of json.
04bc156 [Marcelo Vanzin] Review feedback.
4d4d6b9 [Marcelo Vanzin] [SPARK-3606] [yarn] Correctly configure AmIpFilter for 
Yarn HA.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30abef15
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30abef15
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30abef15

Branch: refs/heads/master
Commit: 30abef154768e5c4c6062f3341933dbda990f6cc
Parents: e5566e0
Author: Marcelo Vanzin <[email protected]>
Authored: Fri Oct 3 13:18:35 2014 -0700
Committer: Andrew Or <[email protected]>
Committed: Fri Oct 3 13:18:35 2014 -0700

----------------------------------------------------------------------
 .../cluster/CoarseGrainedClusterMessage.scala   |  2 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |  8 +++---
 .../scala/org/apache/spark/ui/JettyUtils.scala  | 14 +++++-----
 .../spark/deploy/yarn/YarnRMClientImpl.scala    |  8 ++++--
 .../spark/deploy/yarn/ApplicationMaster.scala   | 12 +++------
 .../apache/spark/deploy/yarn/YarnRMClient.scala |  4 +--
 .../spark/deploy/yarn/YarnRMClientImpl.scala    | 28 +++++++++++++++++++-
 7 files changed, 53 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/30abef15/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 6abf6d9..fb8160a 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -66,7 +66,7 @@ private[spark] object CoarseGrainedClusterMessages {
 
   case class RemoveExecutor(executorId: String, reason: String) extends 
CoarseGrainedClusterMessage
 
-  case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase 
:String)
+  case class AddWebUIFilter(filterName:String, filterParams: Map[String, 
String], proxyBase :String)
     extends CoarseGrainedClusterMessage
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/30abef15/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 89089e7..59aed6b 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -275,15 +275,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
   }
 
   // Add filters to the SparkUI
-  def addWebUIFilter(filterName: String, filterParams: String, proxyBase: 
String) {
+  def addWebUIFilter(filterName: String, filterParams: Map[String, String], 
proxyBase: String) {
     if (proxyBase != null && proxyBase.nonEmpty) {
       System.setProperty("spark.ui.proxyBase", proxyBase)
     }
 
-    if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) {
+    val hasFilter = (filterName != null && filterName.nonEmpty &&
+      filterParams != null && filterParams.nonEmpty)
+    if (hasFilter) {
       logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
       conf.set("spark.ui.filters", filterName)
-      conf.set(s"spark.$filterName.params", filterParams)
+      filterParams.foreach { case (k, v) => 
conf.set(s"spark.$filterName.param.$k", v) }
       scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, 
conf) }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/30abef15/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 6b46892..2a27d49 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -21,9 +21,7 @@ import java.net.{InetSocketAddress, URL}
 import javax.servlet.DispatcherType
 import javax.servlet.http.{HttpServlet, HttpServletRequest, 
HttpServletResponse}
 
-import scala.annotation.tailrec
 import scala.language.implicitConversions
-import scala.util.{Failure, Success, Try}
 import scala.xml.Node
 
 import org.eclipse.jetty.server.Server
@@ -147,15 +145,19 @@ private[spark] object JettyUtils extends Logging {
           val holder : FilterHolder = new FilterHolder()
           holder.setClassName(filter)
           // Get any parameters for each filter
-          val paramName = "spark." + filter + ".params"
-          val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
-          params.foreach {
-            case param : String =>
+          conf.get("spark." + filter + ".params", 
"").split(',').map(_.trim()).toSet.foreach {
+            param: String =>
               if (!param.isEmpty) {
                 val parts = param.split("=")
                 if (parts.length == 2) holder.setInitParameter(parts(0), 
parts(1))
              }
           }
+
+          val prefix = s"spark.$filter.param."
+          conf.getAll
+            .filter { case (k, v) => k.length() > prefix.length() && 
k.startsWith(prefix) }
+            .foreach { case (k, v) => 
holder.setInitParameter(k.substring(prefix.length()), v) }
+
           val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, 
DispatcherType.ERROR,
             DispatcherType.FORWARD, DispatcherType.INCLUDE, 
DispatcherType.REQUEST)
           handlers.foreach { case(handler) => handler.addFilter(holder, "/*", 
enumDispatcher) }

http://git-wip-us.apache.org/repos/asf/spark/blob/30abef15/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index acf2650..9bd1719 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -76,8 +76,12 @@ private class YarnRMClientImpl(args: 
ApplicationMasterArguments) extends YarnRMC
     resourceManager.finishApplicationMaster(finishReq)
   }
 
-  override def getProxyHostAndPort(conf: YarnConfiguration) =
-    YarnConfiguration.getProxyHostAndPort(conf)
+  override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) 
= {
+    val proxy = YarnConfiguration.getProxyHostAndPort(conf)
+    val parts = proxy.split(":")
+    val uriBase = "http://"; + proxy + proxyBase
+    Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+  }
 
   override def getMaxRegAttempts(conf: YarnConfiguration) =
     conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, 
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)

http://git-wip-us.apache.org/repos/asf/spark/blob/30abef15/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index b51daeb..caceef5 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -368,18 +368,14 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments,
 
   /** Add the Yarn IP filter that is required for properly securing the UI. */
   private def addAmIpFilter() = {
-    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
-    val proxy = client.getProxyHostAndPort(yarnConf)
-    val parts = proxy.split(":")
     val proxyBase = 
System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-    val uriBase = "http://"; + proxy + proxyBase
-    val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
-
+    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+    val params = client.getAmIpFilterParams(yarnConf, proxyBase)
     if (isDriver) {
       System.setProperty("spark.ui.filters", amFilter)
-      System.setProperty(s"spark.$amFilter.params", params)
+      params.foreach { case (k, v) => 
System.setProperty(s"spark.$amFilter.param.$k", v) }
     } else {
-      actor ! AddWebUIFilter(amFilter, params, proxyBase)
+      actor ! AddWebUIFilter(amFilter, params.toMap, proxyBase)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/30abef15/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index ed65e56..943dc56 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -59,8 +59,8 @@ trait YarnRMClient {
   /** Returns the attempt ID. */
   def getAttemptId(): ApplicationAttemptId
 
-  /** Returns the RM's proxy host and port. */
-  def getProxyHostAndPort(conf: YarnConfiguration): String
+  /** Returns the configuration for the AmIpFilter to add to the Spark UI. */
+  def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): 
Map[String, String]
 
   /** Returns the maximum number of attempts to register the AM. */
   def getMaxRegAttempts(conf: YarnConfiguration): Int

http://git-wip-us.apache.org/repos/asf/spark/blob/30abef15/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index 54bc6b1..b581790 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -17,8 +17,13 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.util.{List => JList}
+
 import scala.collection.{Map, Set}
+import scala.collection.JavaConversions._
+import scala.util._
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
@@ -69,7 +74,28 @@ private class YarnRMClientImpl(args: 
ApplicationMasterArguments) extends YarnRMC
     appAttemptId
   }
 
-  override def getProxyHostAndPort(conf: YarnConfiguration) = 
WebAppUtils.getProxyHostAndPort(conf)
+  override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) 
= {
+    // Figure out which scheme Yarn is using. Note the method seems to have 
been added after 2.2,
+    // so not all stable releases have it.
+    val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", 
classOf[Configuration])
+        .invoke(null, conf).asInstanceOf[String]).getOrElse("http://";)
+
+    // If running a new enough Yarn, use the HA-aware API for retrieving the 
RM addresses.
+    try {
+      val method = 
classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
+        classOf[Configuration])
+      val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
+      val hosts = proxies.map { proxy => proxy.split(":")(0) }
+      val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
+      Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> 
uriBases.mkString(","))
+    } catch {
+      case e: NoSuchMethodException =>
+        val proxy = WebAppUtils.getProxyHostAndPort(conf)
+        val parts = proxy.split(":")
+        val uriBase = prefix + proxy + proxyBase
+        Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+    }
+  }
 
   override def getMaxRegAttempts(conf: YarnConfiguration) =
     conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to