Repository: spark
Updated Branches:
  refs/heads/master 7dc3e697c -> a18d63711


[SPARK-20434][YARN][CORE] Move Hadoop delegation token code from yarn to core

## What changes were proposed in this pull request?

Move Hadoop delegation token code from `spark-yarn` to `spark-core`, so that 
other schedulers (such as Mesos), may use it.  In order to avoid exposing 
Hadoop interfaces in spark-core, the new Hadoop delegation token classes are 
kept private.  In order to provider backward compatiblity, and to allow YARN 
users to continue to load their own delegation token providers via Java service 
loading, the old YARN interfaces, as well as the client code that uses them, 
have been retained.

Summary:
- Move registered `yarn.security.ServiceCredentialProvider` classes from 
`spark-yarn` to `spark-core`.  Moved them into a new, private hierarchy under 
`HadoopDelegationTokenProvider`.  Client code in `HadoopDelegationTokenManager` 
now loads credentials from a whitelist of three providers 
(`HadoopFSDelegationTokenProvider`, `HiveDelegationTokenProvider`, 
`HBaseDelegationTokenProvider`), instead of service loading, which means that 
users are not able to implement their own delegation token providers, as they 
are in the `spark-yarn` module.

- The `yarn.security.ServiceCredentialProvider` interface has been kept for 
backwards compatibility, and to continue to allow YARN users to implement their 
own delegation token provider implementations.  Client code in YARN now fetches 
tokens via the new `YARNHadoopDelegationTokenManager` class, which fetches 
tokens from the core providers through `HadoopDelegationTokenManager`, as well 
as service loads them from `yarn.security.ServiceCredentialProvider`.

Old Hierarchy:

```
yarn.security.ServiceCredentialProvider (service loaded)
  HadoopFSCredentialProvider
  HiveCredentialProvider
  HBaseCredentialProvider
yarn.security.ConfigurableCredentialManager
```

New Hierarchy:

```
HadoopDelegationTokenManager
HadoopDelegationTokenProvider (not service loaded)
  HadoopFSDelegationTokenProvider
  HiveDelegationTokenProvider
  HBaseDelegationTokenProvider

yarn.security.ServiceCredentialProvider (service loaded)
yarn.security.YARNHadoopDelegationTokenManager
```
## How was this patch tested?

unit tests

Author: Michael Gummelt <mgumm...@mesosphere.io>
Author: Dr. Stefan Schimanski <st...@mesosphere.io>

Closes #17723 from mgummelt/SPARK-20434-refactor-kerberos.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a18d6371
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a18d6371
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a18d6371

Branch: refs/heads/master
Commit: a18d637112b97d2caaca0a8324bdd99086664b24
Parents: 7dc3e69
Author: Michael Gummelt <mgumm...@mesosphere.io>
Authored: Thu Jun 15 11:46:00 2017 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu Jun 15 11:46:00 2017 -0700

----------------------------------------------------------------------
 core/pom.xml                                    |  28 ++++
 .../security/HBaseDelegationTokenProvider.scala |  74 +++++++++
 .../security/HadoopDelegationTokenManager.scala | 119 +++++++++++++++
 .../HadoopDelegationTokenProvider.scala         |  50 +++++++
 .../HadoopFSDelegationTokenProvider.scala       | 126 ++++++++++++++++
 .../security/HiveDelegationTokenProvider.scala  | 121 +++++++++++++++
 .../HadoopDelegationTokenManagerSuite.scala     | 116 ++++++++++++++
 dev/.rat-excludes                               |   5 +-
 docs/running-on-yarn.md                         |  12 +-
 resource-managers/yarn/pom.xml                  |  14 +-
 ...ploy.yarn.security.ServiceCredentialProvider |   3 -
 .../spark/deploy/yarn/ApplicationMaster.scala   |  10 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |   9 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  31 +++-
 .../yarn/security/AMCredentialRenewer.scala     |   6 +-
 .../ConfigurableCredentialManager.scala         | 107 -------------
 .../yarn/security/CredentialUpdater.scala       |   2 +-
 .../yarn/security/HBaseCredentialProvider.scala |  75 ----------
 .../security/HadoopFSCredentialProvider.scala   | 120 ---------------
 .../yarn/security/HiveCredentialProvider.scala  | 129 ----------------
 .../security/ServiceCredentialProvider.scala    |   3 +-
 .../YARNHadoopDelegationTokenManager.scala      |  83 ++++++++++
 ...ploy.yarn.security.ServiceCredentialProvider |   2 +-
 .../ConfigurableCredentialManagerSuite.scala    | 150 -------------------
 .../HadoopFSCredentialProviderSuite.scala       |  70 ---------
 .../YARNHadoopDelegationTokenManagerSuite.scala |  66 ++++++++
 26 files changed, 844 insertions(+), 687 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 7f245b5..326dde4 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -357,6 +357,34 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-crypto</artifactId>
     </dependency>
+
+    <!--
+     The following dependencies are depended upon in HiveCredentialProvider, 
but are only executed if Hive is enabled in
+     the user's Hadoop configuration.  So in order to prevent spark-core from 
depending on Hive, these deps have been
+     placed in the "provided" scope, rather than the "compile" scope, and 
NoClassDefFoundError exceptions are handled
+     when the user has not explicitly compiled with the Hive module.
+    -->
+    <dependency>
+      <groupId>${hive.group}</groupId>
+      <artifactId>hive-exec</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>${hive.group}</groupId>
+      <artifactId>hive-metastore</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libfb303</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
   </dependencies>
   <build>
     
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
new file mode 100644
index 0000000..35621da
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[security] class HBaseDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "hbase"
+
+  override def obtainDelegationTokens(
+      hadoopConf: Configuration,
+      creds: Credentials): Option[Long] = {
+    try {
+      val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+      val obtainToken = mirror.classLoader.
+        loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
+        getMethod("obtainToken", classOf[Configuration])
+
+      logDebug("Attempting to fetch HBase security token.")
+      val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
+        .asInstanceOf[Token[_ <: TokenIdentifier]]
+      logInfo(s"Get token from HBase: ${token.toString}")
+      creds.addToken(token.getService, token)
+    } catch {
+      case NonFatal(e) =>
+        logDebug(s"Failed to get token from service $serviceName", e)
+    }
+
+    None
+  }
+
+  override def delegationTokensRequired(hadoopConf: Configuration): Boolean = {
+    hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos"
+  }
+
+  private def hbaseConf(conf: Configuration): Configuration = {
+    try {
+      val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+      val confCreate = mirror.classLoader.
+        loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
+        getMethod("create", classOf[Configuration])
+      confCreate.invoke(null, conf).asInstanceOf[Configuration]
+    } catch {
+      case NonFatal(e) =>
+        logDebug("Fail to invoke HBaseConfiguration", e)
+        conf
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
new file mode 100644
index 0000000..89b6f52
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+/**
+ * Manages all the registered HadoopDelegationTokenProviders and offer APIs 
for other modules to
+ * obtain delegation tokens and their renewal time. By default 
[[HadoopFSDelegationTokenProvider]],
+ * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will 
be loaded in if not
+ * explicitly disabled.
+ *
+ * Also, each HadoopDelegationTokenProvider is controlled by
+ * spark.security.credentials.{service}.enabled, and will not be loaded if 
this config is set to
+ * false.  For example, Hive's delegation token provider 
[[HiveDelegationTokenProvider]] can be
+ * enabled/disabled by the configuration 
spark.security.credentials.hive.enabled.
+ *
+ * @param sparkConf Spark configuration
+ * @param hadoopConf Hadoop configuration
+ * @param fileSystems Delegation tokens will be fetched for these Hadoop 
filesystems.
+ */
+private[spark] class HadoopDelegationTokenManager(
+    sparkConf: SparkConf,
+    hadoopConf: Configuration,
+    fileSystems: Set[FileSystem])
+  extends Logging {
+
+  private val deprecatedProviderEnabledConfigs = List(
+    "spark.yarn.security.tokens.%s.enabled",
+    "spark.yarn.security.credentials.%s.enabled")
+  private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
+
+  // Maintain all the registered delegation token providers
+  private val delegationTokenProviders = getDelegationTokenProviders
+  logDebug(s"Using the following delegation token providers: " +
+    s"${delegationTokenProviders.keys.mkString(", ")}.")
+
+  private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
+    val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
+      new HiveDelegationTokenProvider,
+      new HBaseDelegationTokenProvider)
+
+    // Filter out providers for which 
spark.security.credentials.{service}.enabled is false.
+    providers
+      .filter { p => isServiceEnabled(p.serviceName) }
+      .map { p => (p.serviceName, p) }
+      .toMap
+  }
+
+  def isServiceEnabled(serviceName: String): Boolean = {
+    val key = providerEnabledConfig.format(serviceName)
+
+    deprecatedProviderEnabledConfigs.foreach { pattern =>
+      val deprecatedKey = pattern.format(serviceName)
+      if (sparkConf.contains(deprecatedKey)) {
+        logWarning(s"${deprecatedKey} is deprecated.  Please use ${key} 
instead.")
+      }
+    }
+
+    val isEnabledDeprecated = deprecatedProviderEnabledConfigs.forall { 
pattern =>
+      sparkConf
+        .getOption(pattern.format(serviceName))
+        .map(_.toBoolean)
+        .getOrElse(true)
+    }
+
+    sparkConf
+      .getOption(key)
+      .map(_.toBoolean)
+      .getOrElse(isEnabledDeprecated)
+  }
+
+  /**
+   * Get delegation token provider for the specified service.
+   */
+  def getServiceDelegationTokenProvider(service: String): 
Option[HadoopDelegationTokenProvider] = {
+    delegationTokenProviders.get(service)
+  }
+
+  /**
+   * Writes delegation tokens to creds.  Delegation tokens are fetched from 
all registered
+   * providers.
+   *
+   * @return Time after which the fetched delegation tokens should be renewed.
+   */
+  def obtainDelegationTokens(
+      hadoopConf: Configuration,
+      creds: Credentials): Long = {
+    delegationTokenProviders.values.flatMap { provider =>
+      if (provider.delegationTokensRequired(hadoopConf)) {
+        provider.obtainDelegationTokens(hadoopConf, creds)
+      } else {
+        logDebug(s"Service ${provider.serviceName} does not require a token." +
+          s" Check your configuration to see if security is disabled or not.")
+        None
+      }
+    }.foldLeft(Long.MaxValue)(math.min)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
new file mode 100644
index 0000000..f162e7e
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+
+/**
+ * Hadoop delegation token provider.
+ */
+private[spark] trait HadoopDelegationTokenProvider {
+
+  /**
+   * Name of the service to provide delegation tokens. This name should be 
unique.  Spark will
+   * internally use this name to differentiate delegation token providers.
+   */
+  def serviceName: String
+
+  /**
+   * Returns true if delegation tokens are required for this service. By 
default, it is based on
+   * whether Hadoop security is enabled.
+   */
+  def delegationTokensRequired(hadoopConf: Configuration): Boolean
+
+  /**
+   * Obtain delegation tokens for this service and get the time of the next 
renewal.
+   * @param hadoopConf Configuration of current Hadoop Compatible system.
+   * @param creds Credentials to add tokens and security keys to.
+   * @return If the returned tokens are renewable and can be renewed, return 
the time of the next
+   *         renewal, otherwise None should be returned.
+   */
+  def obtainDelegationTokens(
+    hadoopConf: Configuration,
+    creds: Credentials): Option[Long]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
new file mode 100644
index 0000000..13157f3
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.mapred.Master
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+
+private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: 
Set[FileSystem])
+    extends HadoopDelegationTokenProvider with Logging {
+
+  // This tokenRenewalInterval will be set in the first call to 
obtainDelegationTokens.
+  // If None, no token renewer is specified or no token can be renewed,
+  // so we cannot get the token renewal interval.
+  private var tokenRenewalInterval: Option[Long] = null
+
+  override val serviceName: String = "hadoopfs"
+
+  override def obtainDelegationTokens(
+      hadoopConf: Configuration,
+      creds: Credentials): Option[Long] = {
+
+    val newCreds = fetchDelegationTokens(
+      getTokenRenewer(hadoopConf),
+      fileSystems)
+
+    // Get the token renewal interval if it is not set. It will only be called 
once.
+    if (tokenRenewalInterval == null) {
+      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems)
+    }
+
+    // Get the time of next renewal.
+    val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
+      val nextRenewalDates = newCreds.getAllTokens.asScala
+        
.filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
+        .map { token =>
+          val identifier = token
+            .decodeIdentifier()
+            .asInstanceOf[AbstractDelegationTokenIdentifier]
+          identifier.getIssueDate + interval
+        }
+      if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
+    }
+
+    creds.addAll(newCreds)
+    nextRenewalDate
+  }
+
+  def delegationTokensRequired(hadoopConf: Configuration): Boolean = {
+    UserGroupInformation.isSecurityEnabled
+  }
+
+  private def getTokenRenewer(hadoopConf: Configuration): String = {
+    val tokenRenewer = Master.getMasterPrincipal(hadoopConf)
+    logDebug("Delegation token renewer is: " + tokenRenewer)
+
+    if (tokenRenewer == null || tokenRenewer.length() == 0) {
+      val errorMessage = "Can't get Master Kerberos principal for use as 
renewer."
+      logError(errorMessage)
+      throw new SparkException(errorMessage)
+    }
+
+    tokenRenewer
+  }
+
+  private def fetchDelegationTokens(
+      renewer: String,
+      filesystems: Set[FileSystem]): Credentials = {
+
+    val creds = new Credentials()
+
+    filesystems.foreach { fs =>
+      logInfo("getting token for: " + fs)
+      fs.addDelegationTokens(renewer, creds)
+    }
+
+    creds
+  }
+
+  private def getTokenRenewalInterval(
+      hadoopConf: Configuration,
+      filesystems: Set[FileSystem]): Option[Long] = {
+    // We cannot use the tokens generated with renewer yarn. Trying to renew
+    // those will fail with an access control issue. So create new tokens with 
the logged in
+    // user as renewer.
+    val creds = fetchDelegationTokens(
+      UserGroupInformation.getCurrentUser.getUserName,
+      filesystems)
+
+    val renewIntervals = creds.getAllTokens.asScala.filter {
+      _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
+    }.flatMap { token =>
+      Try {
+        val newExpiration = token.renew(hadoopConf)
+        val identifier = 
token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
+        val interval = newExpiration - identifier.getIssueDate
+        logInfo(s"Renewal interval is $interval for token 
${token.getKind.toString}")
+        interval
+      }.toOption
+    }
+    if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
new file mode 100644
index 0000000..53b9f89
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.lang.reflect.UndeclaredThrowableException
+import java.security.PrivilegedExceptionAction
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.metadata.Hive
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[security] class HiveDelegationTokenProvider
+    extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "hive"
+
+  private val classNotFoundErrorStr = s"You are attempting to use the " +
+    s"${getClass.getCanonicalName}, but your Spark distribution is not built 
with Hive libraries."
+
+  private def hiveConf(hadoopConf: Configuration): Configuration = {
+    try {
+      new HiveConf(hadoopConf, classOf[HiveConf])
+    } catch {
+      case NonFatal(e) =>
+        logDebug("Fail to create Hive Configuration", e)
+        hadoopConf
+      case e: NoClassDefFoundError =>
+        logWarning(classNotFoundErrorStr)
+        hadoopConf
+    }
+  }
+
+  override def delegationTokensRequired(hadoopConf: Configuration): Boolean = {
+    UserGroupInformation.isSecurityEnabled &&
+      hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty
+  }
+
+  override def obtainDelegationTokens(
+      hadoopConf: Configuration,
+      creds: Credentials): Option[Long] = {
+    try {
+      val conf = hiveConf(hadoopConf)
+
+      val principalKey = "hive.metastore.kerberos.principal"
+      val principal = conf.getTrimmed(principalKey, "")
+      require(principal.nonEmpty, s"Hive principal $principalKey undefined")
+      val metastoreUri = conf.getTrimmed("hive.metastore.uris", "")
+      require(metastoreUri.nonEmpty, "Hive metastore uri undefined")
+
+      val currentUser = UserGroupInformation.getCurrentUser()
+      logDebug(s"Getting Hive delegation token for 
${currentUser.getUserName()} against " +
+        s"$principal at $metastoreUri")
+
+      doAsRealUser {
+        val hive = Hive.get(conf, classOf[HiveConf])
+        val tokenStr = hive.getDelegationToken(currentUser.getUserName(), 
principal)
+
+        val hive2Token = new Token[DelegationTokenIdentifier]()
+        hive2Token.decodeFromUrlString(tokenStr)
+        logInfo(s"Get Token from hive metastore: ${hive2Token.toString}")
+        creds.addToken(new Text("hive.server2.delegation.token"), hive2Token)
+      }
+
+      None
+    } catch {
+      case NonFatal(e) =>
+        logDebug(s"Failed to get token from service $serviceName", e)
+        None
+      case e: NoClassDefFoundError =>
+        logWarning(classNotFoundErrorStr)
+        None
+    } finally {
+      Utils.tryLogNonFatalError {
+        Hive.closeCurrent()
+      }
+    }
+  }
+
+  /**
+   * Run some code as the real logged in user (which may differ from the 
current user, for
+   * example, when using proxying).
+   */
+  private def doAsRealUser[T](fn: => T): T = {
+    val currentUser = UserGroupInformation.getCurrentUser()
+    val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
+
+   // For some reason the Scala-generated anonymous class ends up causing an
+   // UndeclaredThrowableException, even if you annotate the method with 
@throws.
+   try {
+      realUser.doAs(new PrivilegedExceptionAction[T]() {
+        override def run(): T = fn
+      })
+    } catch {
+      case e: UndeclaredThrowableException => throw 
Option(e.getCause()).getOrElse(e)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
new file mode 100644
index 0000000..335f344
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.security.Credentials
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
+  private var delegationTokenManager: HadoopDelegationTokenManager = null
+  private var sparkConf: SparkConf = null
+  private var hadoopConf: Configuration = null
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    sparkConf = new SparkConf()
+    hadoopConf = new Configuration()
+  }
+
+  test("Correctly load default credential providers") {
+    delegationTokenManager = new HadoopDelegationTokenManager(
+      sparkConf,
+      hadoopConf,
+      hadoopFSsToAccess(hadoopConf))
+
+    delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") 
should not be (None)
+    delegationTokenManager.getServiceDelegationTokenProvider("hbase") should 
not be (None)
+    delegationTokenManager.getServiceDelegationTokenProvider("hive") should 
not be (None)
+    delegationTokenManager.getServiceDelegationTokenProvider("bogus") should 
be (None)
+  }
+
+  test("disable hive credential provider") {
+    sparkConf.set("spark.security.credentials.hive.enabled", "false")
+    delegationTokenManager = new HadoopDelegationTokenManager(
+      sparkConf,
+      hadoopConf,
+      hadoopFSsToAccess(hadoopConf))
+
+    delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") 
should not be (None)
+    delegationTokenManager.getServiceDelegationTokenProvider("hbase") should 
not be (None)
+    delegationTokenManager.getServiceDelegationTokenProvider("hive") should be 
(None)
+  }
+
+  test("using deprecated configurations") {
+    sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
+    sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
+    delegationTokenManager = new HadoopDelegationTokenManager(
+      sparkConf,
+      hadoopConf,
+      hadoopFSsToAccess(hadoopConf))
+
+    delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") 
should be (None)
+    delegationTokenManager.getServiceDelegationTokenProvider("hive") should be 
(None)
+    delegationTokenManager.getServiceDelegationTokenProvider("hbase") should 
not be (None)
+  }
+
+  test("verify no credentials are obtained") {
+    delegationTokenManager = new HadoopDelegationTokenManager(
+      sparkConf,
+      hadoopConf,
+      hadoopFSsToAccess(hadoopConf))
+    val creds = new Credentials()
+
+    // Tokens cannot be obtained from HDFS, Hive, HBase in unit tests.
+    delegationTokenManager.obtainDelegationTokens(hadoopConf, creds)
+    val tokens = creds.getAllTokens
+    tokens.size() should be (0)
+  }
+
+  test("obtain tokens For HiveMetastore") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set("hive.metastore.kerberos.principal", "bob")
+    // thrift picks up on port 0 and bails out, without trying to talk to 
endpoint
+    hadoopConf.set("hive.metastore.uris", "http://localhost:0";)
+
+    val hiveCredentialProvider = new HiveDelegationTokenProvider()
+    val credentials = new Credentials()
+    hiveCredentialProvider.obtainDelegationTokens(hadoopConf, credentials)
+
+    credentials.getAllTokens.size() should be (0)
+  }
+
+  test("Obtain tokens For HBase") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set("hbase.security.authentication", "kerberos")
+
+    val hbaseTokenProvider = new HBaseDelegationTokenProvider()
+    val creds = new Credentials()
+    hbaseTokenProvider.obtainDelegationTokens(hadoopConf, creds)
+
+    creds.getAllTokens.size should be (0)
+  }
+
+  private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): 
Set[FileSystem] = {
+    Set(FileSystem.get(hadoopConf))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/dev/.rat-excludes
----------------------------------------------------------------------
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 2355d40..607234b 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -93,16 +93,13 @@ INDEX
 .lintr
 gen-java.*
 .*avpr
-org.apache.spark.sql.sources.DataSourceRegister
-org.apache.spark.scheduler.SparkHistoryListenerFactory
 .*parquet
 spark-deps-.*
 .*csv
 .*tsv
-org.apache.spark.scheduler.ExternalClusterManager
 .*\.sql
 .Rbuildignore
-org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
+META-INF/*
 spark-warehouse
 structured-streaming/*
 kafka-source-initial-offset-version-2.1.0.bin

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 2d56123..e4a7455 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -419,7 +419,7 @@ To use a custom metrics.properties for the application 
master and executors, upd
   </td>
 </tr>
 <tr>
-  <td><code>spark.yarn.security.credentials.${service}.enabled</code></td>
+  <td><code>spark.security.credentials.${service}.enabled</code></td>
   <td><code>true</code></td>
   <td>
   Controls whether to obtain credentials for services when security is enabled.
@@ -482,11 +482,11 @@ token for the cluster's default Hadoop filesystem, and 
potentially for HBase and
 
 An HBase token will be obtained if HBase is in on classpath, the HBase 
configuration declares
 the application is secure (i.e. `hbase-site.xml` sets 
`hbase.security.authentication` to `kerberos`),
-and `spark.yarn.security.credentials.hbase.enabled` is not set to `false`.
+and `spark.security.credentials.hbase.enabled` is not set to `false`.
 
 Similarly, a Hive token will be obtained if Hive is on the classpath, its 
configuration
 includes a URI of the metadata store in `"hive.metastore.uris`, and
-`spark.yarn.security.credentials.hive.enabled` is not set to `false`.
+`spark.security.credentials.hive.enabled` is not set to `false`.
 
 If an application needs to interact with other secure Hadoop filesystems, then
 the tokens needed to access these clusters must be explicitly requested at
@@ -500,7 +500,7 @@ Spark supports integrating with other security-aware 
services through Java Servi
 `java.util.ServiceLoader`). To do that, implementations of 
`org.apache.spark.deploy.yarn.security.ServiceCredentialProvider`
 should be available to Spark by listing their names in the corresponding file 
in the jar's
 `META-INF/services` directory. These plug-ins can be disabled by setting
-`spark.yarn.security.credentials.{service}.enabled` to `false`, where 
`{service}` is the name of
+`spark.security.credentials.{service}.enabled` to `false`, where `{service}` 
is the name of
 credential provider.
 
 ## Configuring the External Shuffle Service
@@ -564,8 +564,8 @@ the Spark configuration must be set to disable token 
collection for the services
 The Spark configuration must include the lines:
 
 ```
-spark.yarn.security.credentials.hive.enabled   false
-spark.yarn.security.credentials.hbase.enabled  false
+spark.security.credentials.hive.enabled   false
+spark.security.credentials.hbase.enabled  false
 ```
 
 The configuration option `spark.yarn.access.hadoopFileSystems` must be unset.

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml
index 71d4ad6..43a7ce9 100644
--- a/resource-managers/yarn/pom.xml
+++ b/resource-managers/yarn/pom.xml
@@ -167,29 +167,27 @@
        <version>${jersey-1.version}</version>
      </dependency>
 
-    <!--
-     Testing Hive reflection needs hive on the test classpath only.
-     It doesn't need the spark hive modules, so the -Phive flag is not checked.
-      -->
+    <!-- These dependencies are duplicated from core, because dependencies in 
the "provided"
+    scope are not transitive.-->
     <dependency>
       <groupId>${hive.group}</groupId>
       <artifactId>hive-exec</artifactId>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>${hive.group}</groupId>
       <artifactId>hive-metastore</artifactId>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.thrift</groupId>
       <artifactId>libthrift</artifactId>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.thrift</groupId>
       <artifactId>libfb303</artifactId>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
 
b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
deleted file mode 100644
index f5a807e..0000000
--- 
a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
+++ /dev/null
@@ -1,3 +0,0 @@
-org.apache.spark.deploy.yarn.security.HadoopFSCredentialProvider
-org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
-org.apache.spark.deploy.yarn.security.HiveCredentialProvider

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 6da2c0b..4f71a16 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -38,7 +38,7 @@ import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, 
ConfigurableCredentialManager}
+import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, 
YARNHadoopDelegationTokenManager}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.rpc._
@@ -247,8 +247,12 @@ private[spark] class ApplicationMaster(
       if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
         // If a principal and keytab have been set, use that to create new 
credentials for executors
         // periodically
-        credentialRenewer =
-          new ConfigurableCredentialManager(sparkConf, 
yarnConf).credentialRenewer()
+        val credentialManager = new YARNHadoopDelegationTokenManager(
+          sparkConf,
+          yarnConf,
+          YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))
+
+        val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, 
credentialManager)
         credentialRenewer.scheduleLoginFromKeytab()
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1fb7edf..e5131e6 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.util.Records
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
+import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, 
YarnCommandBuilderUtils}
@@ -121,7 +121,10 @@ private[spark] class Client(
   private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) 
}
     .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
 
-  private val credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
+  private val credentialManager = new YARNHadoopDelegationTokenManager(
+    sparkConf,
+    hadoopConf,
+    YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
 
   def reportLauncherState(state: SparkAppHandle.State): Unit = {
     launcherBackend.setState(state)
@@ -368,7 +371,7 @@ private[spark] class Client(
     val fs = destDir.getFileSystem(hadoopConf)
 
     // Merge credentials obtained from registered providers
-    val nearestTimeOfNextRenewal = 
credentialManager.obtainCredentials(hadoopConf, credentials)
+    val nearestTimeOfNextRenewal = 
credentialManager.obtainDelegationTokens(hadoopConf, credentials)
 
     if (credentials != null) {
       // Add credentials to current user's UGI, so that following operations 
don't need to use the

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 0fc994d..4522071 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -24,8 +24,9 @@ import java.util.regex.Pattern
 import scala.collection.mutable.{HashMap, ListBuffer}
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.{JobConf, Master}
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.ApplicationConstants
@@ -35,11 +36,14 @@ import org.apache.hadoop.yarn.util.ConverterUtils
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.security.{ConfigurableCredentialManager, 
CredentialUpdater}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.deploy.yarn.security.CredentialUpdater
+import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
 import org.apache.spark.internal.config._
 import org.apache.spark.launcher.YarnCommandBuilderUtils
 import org.apache.spark.util.Utils
 
+
 /**
  * Contains util methods to interact with Hadoop from spark.
  */
@@ -87,8 +91,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
   }
 
   private[spark] override def startCredentialUpdater(sparkConf: SparkConf): 
Unit = {
-    credentialUpdater =
-      new ConfigurableCredentialManager(sparkConf, 
newConfiguration(sparkConf)).credentialUpdater()
+    val hadoopConf = newConfiguration(sparkConf)
+    val credentialManager = new YARNHadoopDelegationTokenManager(
+      sparkConf,
+      hadoopConf,
+      YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+    credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, 
credentialManager)
     credentialUpdater.start()
   }
 
@@ -103,6 +111,21 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     val containerIdString = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
     ConverterUtils.toContainerId(containerIdString)
   }
+
+  /** The filesystems for which YARN should fetch delegation tokens. */
+  private[spark] def hadoopFSsToAccess(
+      sparkConf: SparkConf,
+      hadoopConf: Configuration): Set[FileSystem] = {
+    val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS)
+      .map(new Path(_).getFileSystem(hadoopConf))
+      .toSet
+
+    val stagingFS = sparkConf.get(STAGING_DIR)
+      .map(new Path(_).getFileSystem(hadoopConf))
+      .getOrElse(FileSystem.get(hadoopConf))
+
+    filesystemsToAccess + stagingFS
+  }
 }
 
 object YarnSparkHadoopUtil {

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
index 7e76f40..68a2e9e 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
@@ -54,7 +54,7 @@ import org.apache.spark.util.ThreadUtils
 private[yarn] class AMCredentialRenewer(
     sparkConf: SparkConf,
     hadoopConf: Configuration,
-    credentialManager: ConfigurableCredentialManager) extends Logging {
+    credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
 
   private var lastCredentialsFileSuffix = 0
 
@@ -174,7 +174,9 @@ private[yarn] class AMCredentialRenewer(
     keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
       // Get a copy of the credentials
       override def run(): Void = {
-        nearestNextRenewalTime = 
credentialManager.obtainCredentials(freshHadoopConf, tempCreds)
+        nearestNextRenewalTime = credentialManager.obtainDelegationTokens(
+          freshHadoopConf,
+          tempCreds)
         null
       }
     })

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
deleted file mode 100644
index 4f4be52..0000000
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.Credentials
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-/**
- * A ConfigurableCredentialManager to manage all the registered credential 
providers and offer
- * APIs for other modules to obtain credentials as well as renewal time. By 
default
- * [[HadoopFSCredentialProvider]], [[HiveCredentialProvider]] and 
[[HBaseCredentialProvider]] will
- * be loaded in if not explicitly disabled, any plugged-in credential provider 
wants to be
- * managed by ConfigurableCredentialManager needs to implement 
[[ServiceCredentialProvider]]
- * interface and put into resources/META-INF/services to be loaded by 
ServiceLoader.
- *
- * Also each credential provider is controlled by
- * spark.yarn.security.credentials.{service}.enabled, it will not be loaded in 
if set to false.
- * For example, Hive's credential provider [[HiveCredentialProvider]] can be 
enabled/disabled by
- * the configuration spark.yarn.security.credentials.hive.enabled.
- */
-private[yarn] final class ConfigurableCredentialManager(
-    sparkConf: SparkConf, hadoopConf: Configuration) extends Logging {
-  private val deprecatedProviderEnabledConfig = 
"spark.yarn.security.tokens.%s.enabled"
-  private val providerEnabledConfig = 
"spark.yarn.security.credentials.%s.enabled"
-
-  // Maintain all the registered credential providers
-  private val credentialProviders = {
-    val providers = ServiceLoader.load(classOf[ServiceCredentialProvider],
-      Utils.getContextOrSparkClassLoader).asScala
-
-    // Filter out credentials in which 
spark.yarn.security.credentials.{service}.enabled is false.
-    providers.filter { p =>
-      sparkConf.getOption(providerEnabledConfig.format(p.serviceName))
-        .orElse {
-          
sparkConf.getOption(deprecatedProviderEnabledConfig.format(p.serviceName)).map 
{ c =>
-            
logWarning(s"${deprecatedProviderEnabledConfig.format(p.serviceName)} is 
deprecated, " +
-              s"using ${providerEnabledConfig.format(p.serviceName)} instead")
-            c
-          }
-        }.map(_.toBoolean).getOrElse(true)
-    }.map { p => (p.serviceName, p) }.toMap
-  }
-
-  /**
-   * Get credential provider for the specified service.
-   */
-  def getServiceCredentialProvider(service: String): 
Option[ServiceCredentialProvider] = {
-    credentialProviders.get(service)
-  }
-
-  /**
-   * Obtain credentials from all the registered providers.
-   * @return nearest time of next renewal, Long.MaxValue if all the 
credentials aren't renewable,
-   *         otherwise the nearest renewal time of any credentials will be 
returned.
-   */
-  def obtainCredentials(hadoopConf: Configuration, creds: Credentials): Long = 
{
-    credentialProviders.values.flatMap { provider =>
-      if (provider.credentialsRequired(hadoopConf)) {
-        provider.obtainCredentials(hadoopConf, sparkConf, creds)
-      } else {
-        logDebug(s"Service ${provider.serviceName} does not require a token." +
-          s" Check your configuration to see if security is disabled or not.")
-        None
-      }
-    }.foldLeft(Long.MaxValue)(math.min)
-  }
-
-  /**
-   * Create an [[AMCredentialRenewer]] instance, caller should be responsible 
to stop this
-   * instance when it is not used. AM will use it to renew credentials 
periodically.
-   */
-  def credentialRenewer(): AMCredentialRenewer = {
-    new AMCredentialRenewer(sparkConf, hadoopConf, this)
-  }
-
-  /**
-   * Create an [[CredentialUpdater]] instance, caller should be resposible to 
stop this intance
-   * when it is not used. Executors and driver (client mode) will use it to 
update credentials.
-   * periodically.
-   */
-  def credentialUpdater(): CredentialUpdater = {
-    new CredentialUpdater(sparkConf, hadoopConf, this)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
index 41b7b5d..fe173df 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
 private[spark] class CredentialUpdater(
     sparkConf: SparkConf,
     hadoopConf: Configuration,
-    credentialManager: ConfigurableCredentialManager) extends Logging {
+    credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
 
   @volatile private var lastCredentialsFileSuffix = 0
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
deleted file mode 100644
index 5adeb8e..0000000
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import scala.reflect.runtime.universe
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.token.{Token, TokenIdentifier}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-private[security] class HBaseCredentialProvider extends 
ServiceCredentialProvider with Logging {
-
-  override def serviceName: String = "hbase"
-
-  override def obtainCredentials(
-      hadoopConf: Configuration,
-      sparkConf: SparkConf,
-      creds: Credentials): Option[Long] = {
-    try {
-      val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
-      val obtainToken = mirror.classLoader.
-        loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
-        getMethod("obtainToken", classOf[Configuration])
-
-      logDebug("Attempting to fetch HBase security token.")
-      val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
-        .asInstanceOf[Token[_ <: TokenIdentifier]]
-      logInfo(s"Get token from HBase: ${token.toString}")
-      creds.addToken(token.getService, token)
-    } catch {
-      case NonFatal(e) =>
-        logDebug(s"Failed to get token from service $serviceName", e)
-    }
-
-    None
-  }
-
-  override def credentialsRequired(hadoopConf: Configuration): Boolean = {
-    hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos"
-  }
-
-  private def hbaseConf(conf: Configuration): Configuration = {
-    try {
-      val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
-      val confCreate = mirror.classLoader.
-        loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
-        getMethod("create", classOf[Configuration])
-      confCreate.invoke(null, conf).asInstanceOf[Configuration]
-    } catch {
-      case NonFatal(e) =>
-        logDebug("Fail to invoke HBaseConfiguration", e)
-        conf
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
deleted file mode 100644
index f65c886..0000000
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import scala.collection.JavaConverters._
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapred.Master
-import org.apache.hadoop.security.Credentials
-import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-
-private[security] class HadoopFSCredentialProvider
-    extends ServiceCredentialProvider with Logging {
-  // Token renewal interval, this value will be set in the first call,
-  // if None means no token renewer specified or no token can be renewed,
-  // so cannot get token renewal interval.
-  private var tokenRenewalInterval: Option[Long] = null
-
-  override val serviceName: String = "hadoopfs"
-
-  override def obtainCredentials(
-      hadoopConf: Configuration,
-      sparkConf: SparkConf,
-      creds: Credentials): Option[Long] = {
-    // NameNode to access, used to get tokens from different FileSystems
-    val tmpCreds = new Credentials()
-    val tokenRenewer = getTokenRenewer(hadoopConf)
-    hadoopFSsToAccess(hadoopConf, sparkConf).foreach { dst =>
-      val dstFs = dst.getFileSystem(hadoopConf)
-      logInfo("getting token for: " + dst)
-      dstFs.addDelegationTokens(tokenRenewer, tmpCreds)
-    }
-
-    // Get the token renewal interval if it is not set. It will only be called 
once.
-    if (tokenRenewalInterval == null) {
-      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf)
-    }
-
-    // Get the time of next renewal.
-    val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
-      val nextRenewalDates = tmpCreds.getAllTokens.asScala
-        
.filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
-        .map { t =>
-          val identifier = 
t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
-          identifier.getIssueDate + interval
-        }
-      if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
-    }
-
-    creds.addAll(tmpCreds)
-    nextRenewalDate
-  }
-
-  private def getTokenRenewalInterval(
-      hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = {
-    // We cannot use the tokens generated with renewer yarn. Trying to renew
-    // those will fail with an access control issue. So create new tokens with 
the logged in
-    // user as renewer.
-    sparkConf.get(PRINCIPAL).flatMap { renewer =>
-      val creds = new Credentials()
-      hadoopFSsToAccess(hadoopConf, sparkConf).foreach { dst =>
-        val dstFs = dst.getFileSystem(hadoopConf)
-        dstFs.addDelegationTokens(renewer, creds)
-      }
-
-      val renewIntervals = creds.getAllTokens.asScala.filter {
-        _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
-      }.flatMap { token =>
-        Try {
-          val newExpiration = token.renew(hadoopConf)
-          val identifier = 
token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
-          val interval = newExpiration - identifier.getIssueDate
-          logInfo(s"Renewal interval is $interval for token 
${token.getKind.toString}")
-          interval
-        }.toOption
-      }
-      if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
-    }
-  }
-
-  private def getTokenRenewer(conf: Configuration): String = {
-    val delegTokenRenewer = Master.getMasterPrincipal(conf)
-    logDebug("delegation token renewer is: " + delegTokenRenewer)
-    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-      val errorMessage = "Can't get Master Kerberos principal for use as 
renewer"
-      logError(errorMessage)
-      throw new SparkException(errorMessage)
-    }
-
-    delegTokenRenewer
-  }
-
-  private def hadoopFSsToAccess(hadoopConf: Configuration, sparkConf: 
SparkConf): Set[Path] = {
-    sparkConf.get(FILESYSTEMS_TO_ACCESS).map(new Path(_)).toSet +
-      sparkConf.get(STAGING_DIR).map(new Path(_))
-        .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
deleted file mode 100644
index 16d8fc3..0000000
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.lang.reflect.UndeclaredThrowableException
-import java.security.PrivilegedExceptionAction
-
-import scala.reflect.runtime.universe
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-import org.apache.hadoop.security.token.Token
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-private[security] class HiveCredentialProvider extends 
ServiceCredentialProvider with Logging {
-
-  override def serviceName: String = "hive"
-
-  private def hiveConf(hadoopConf: Configuration): Configuration = {
-    try {
-      val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
-      // the hive configuration class is a subclass of Hadoop Configuration, 
so can be cast down
-      // to a Configuration and used without reflection
-      val hiveConfClass = 
mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
-      // using the (Configuration, Class) constructor allows the current 
configuration to be
-      // included in the hive config.
-      val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
-        classOf[Object].getClass)
-      ctor.newInstance(hadoopConf, hiveConfClass).asInstanceOf[Configuration]
-    } catch {
-      case NonFatal(e) =>
-        logDebug("Fail to create Hive Configuration", e)
-        hadoopConf
-    }
-  }
-
-  override def credentialsRequired(hadoopConf: Configuration): Boolean = {
-    UserGroupInformation.isSecurityEnabled &&
-      hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty
-  }
-
-  override def obtainCredentials(
-      hadoopConf: Configuration,
-      sparkConf: SparkConf,
-      creds: Credentials): Option[Long] = {
-    val conf = hiveConf(hadoopConf)
-
-    val principalKey = "hive.metastore.kerberos.principal"
-    val principal = conf.getTrimmed(principalKey, "")
-    require(principal.nonEmpty, s"Hive principal $principalKey undefined")
-    val metastoreUri = conf.getTrimmed("hive.metastore.uris", "")
-    require(metastoreUri.nonEmpty, "Hive metastore uri undefined")
-
-    val currentUser = UserGroupInformation.getCurrentUser()
-    logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} 
against " +
-      s"$principal at $metastoreUri")
-
-    val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
-    val hiveClass = 
mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
-    val hiveConfClass = 
mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
-    val closeCurrent = hiveClass.getMethod("closeCurrent")
-
-    try {
-      // get all the instance methods before invoking any
-      val getDelegationToken = hiveClass.getMethod("getDelegationToken",
-        classOf[String], classOf[String])
-      val getHive = hiveClass.getMethod("get", hiveConfClass)
-
-      doAsRealUser {
-        val hive = getHive.invoke(null, conf)
-        val tokenStr = getDelegationToken.invoke(hive, 
currentUser.getUserName(), principal)
-          .asInstanceOf[String]
-        val hive2Token = new Token[DelegationTokenIdentifier]()
-        hive2Token.decodeFromUrlString(tokenStr)
-        logInfo(s"Get Token from hive metastore: ${hive2Token.toString}")
-        creds.addToken(new Text("hive.server2.delegation.token"), hive2Token)
-      }
-    } catch {
-      case NonFatal(e) =>
-        logDebug(s"Fail to get token from service $serviceName", e)
-    } finally {
-      Utils.tryLogNonFatalError {
-        closeCurrent.invoke(null)
-      }
-    }
-
-    None
-  }
-
-  /**
-   * Run some code as the real logged in user (which may differ from the 
current user, for
-   * example, when using proxying).
-   */
-  private def doAsRealUser[T](fn: => T): T = {
-    val currentUser = UserGroupInformation.getCurrentUser()
-    val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
-
-   // For some reason the Scala-generated anonymous class ends up causing an
-   // UndeclaredThrowableException, even if you annotate the method with 
@throws.
-   try {
-      realUser.doAs(new PrivilegedExceptionAction[T]() {
-        override def run(): T = fn
-      })
-    } catch {
-      case e: UndeclaredThrowableException => throw 
Option(e.getCause()).getOrElse(e)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
index 4e3fcce..cc24ac4 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
@@ -35,7 +35,7 @@ trait ServiceCredentialProvider {
   def serviceName: String
 
   /**
-   * To decide whether credential is required for this service. By default it 
based on whether
+   * Returns true if credentials are required by this service. By default, it 
is based on whether
    * Hadoop security is enabled.
    */
   def credentialsRequired(hadoopConf: Configuration): Boolean = {
@@ -44,6 +44,7 @@ trait ServiceCredentialProvider {
 
   /**
    * Obtain credentials for this service and get the time of the next renewal.
+   *
    * @param hadoopConf Configuration of current Hadoop Compatible system.
    * @param sparkConf Spark configuration.
    * @param creds Credentials to add tokens and security keys to.

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
new file mode 100644
index 0000000..bbd17c8
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.util.ServiceLoader
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This class loads delegation token providers registered under the 
YARN-specific
+ * [[ServiceCredentialProvider]] interface, as well as the builtin providers 
defined
+ * in [[HadoopDelegationTokenManager]].
+ */
+private[yarn] class YARNHadoopDelegationTokenManager(
+    sparkConf: SparkConf,
+    hadoopConf: Configuration,
+    fileSystems: Set[FileSystem]) extends Logging {
+
+  private val delegationTokenManager =
+    new HadoopDelegationTokenManager(sparkConf, hadoopConf, fileSystems)
+
+  // public for testing
+  val credentialProviders = getCredentialProviders
+
+  /**
+   * Writes delegation tokens to creds.  Delegation tokens are fetched from 
all registered
+   * providers.
+   *
+   * @return Time after which the fetched delegation tokens should be renewed.
+   */
+  def obtainDelegationTokens(hadoopConf: Configuration, creds: Credentials): 
Long = {
+    val superInterval = 
delegationTokenManager.obtainDelegationTokens(hadoopConf, creds)
+
+    credentialProviders.values.flatMap { provider =>
+      if (provider.credentialsRequired(hadoopConf)) {
+        provider.obtainCredentials(hadoopConf, sparkConf, creds)
+      } else {
+        logDebug(s"Service ${provider.serviceName} does not require a token." +
+          s" Check your configuration to see if security is disabled or not.")
+        None
+      }
+    }.foldLeft(superInterval)(math.min)
+  }
+
+  private def getCredentialProviders: Map[String, ServiceCredentialProvider] = 
{
+    val providers = loadCredentialProviders
+
+    providers.
+      filter { p => delegationTokenManager.isServiceEnabled(p.serviceName) }
+      .map { p => (p.serviceName, p) }
+      .toMap
+  }
+
+  private def loadCredentialProviders: List[ServiceCredentialProvider] = {
+    ServiceLoader.load(classOf[ServiceCredentialProvider], 
Utils.getContextOrSparkClassLoader)
+      .asScala
+      .toList
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
 
b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
index d0ef5ef..f31c232 100644
--- 
a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
+++ 
b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -1 +1 @@
-org.apache.spark.deploy.yarn.security.TestCredentialProvider
+org.apache.spark.deploy.yarn.security.YARNTestCredentialProvider

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
deleted file mode 100644
index b0067aa..0000000
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.token.Token
-import org.scalatest.{BeforeAndAfter, Matchers}
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.yarn.config._
-
-class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfter {
-  private var credentialManager: ConfigurableCredentialManager = null
-  private var sparkConf: SparkConf = null
-  private var hadoopConf: Configuration = null
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-
-    sparkConf = new SparkConf()
-    hadoopConf = new Configuration()
-    System.setProperty("SPARK_YARN_MODE", "true")
-  }
-
-  override def afterAll(): Unit = {
-    System.clearProperty("SPARK_YARN_MODE")
-
-    super.afterAll()
-  }
-
-  test("Correctly load default credential providers") {
-    credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
-
-    credentialManager.getServiceCredentialProvider("hadoopfs") should not be 
(None)
-    credentialManager.getServiceCredentialProvider("hbase") should not be 
(None)
-    credentialManager.getServiceCredentialProvider("hive") should not be (None)
-  }
-
-  test("disable hive credential provider") {
-    sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
-    credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
-
-    credentialManager.getServiceCredentialProvider("hadoopfs") should not be 
(None)
-    credentialManager.getServiceCredentialProvider("hbase") should not be 
(None)
-    credentialManager.getServiceCredentialProvider("hive") should be (None)
-  }
-
-  test("using deprecated configurations") {
-    sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
-    sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
-    credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
-
-    credentialManager.getServiceCredentialProvider("hadoopfs") should be (None)
-    credentialManager.getServiceCredentialProvider("hive") should be (None)
-    credentialManager.getServiceCredentialProvider("test") should not be (None)
-    credentialManager.getServiceCredentialProvider("hbase") should not be 
(None)
-  }
-
-  test("verify obtaining credentials from provider") {
-    credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
-    val creds = new Credentials()
-
-    // Tokens can only be obtained from TestTokenProvider, for hdfs, hbase and 
hive tokens cannot
-    // be obtained.
-    credentialManager.obtainCredentials(hadoopConf, creds)
-    val tokens = creds.getAllTokens
-    tokens.size() should be (1)
-    tokens.iterator().next().getService should be (new Text("test"))
-  }
-
-  test("verify getting credential renewal info") {
-    credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
-    val creds = new Credentials()
-
-    val testCredentialProvider = 
credentialManager.getServiceCredentialProvider("test").get
-      .asInstanceOf[TestCredentialProvider]
-    // Only TestTokenProvider can get the time of next token renewal
-    val nextRenewal = credentialManager.obtainCredentials(hadoopConf, creds)
-    nextRenewal should be (testCredentialProvider.timeOfNextTokenRenewal)
-  }
-
-  test("obtain tokens For HiveMetastore") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set("hive.metastore.kerberos.principal", "bob")
-    // thrift picks up on port 0 and bails out, without trying to talk to 
endpoint
-    hadoopConf.set("hive.metastore.uris", "http://localhost:0";)
-
-    val hiveCredentialProvider = new HiveCredentialProvider()
-    val credentials = new Credentials()
-    hiveCredentialProvider.obtainCredentials(hadoopConf, sparkConf, 
credentials)
-
-    credentials.getAllTokens.size() should be (0)
-  }
-
-  test("Obtain tokens For HBase") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set("hbase.security.authentication", "kerberos")
-
-    val hbaseTokenProvider = new HBaseCredentialProvider()
-    val creds = new Credentials()
-    hbaseTokenProvider.obtainCredentials(hadoopConf, sparkConf, creds)
-
-    creds.getAllTokens.size should be (0)
-  }
-}
-
-class TestCredentialProvider extends ServiceCredentialProvider {
-  val tokenRenewalInterval = 86400 * 1000L
-  var timeOfNextTokenRenewal = 0L
-
-  override def serviceName: String = "test"
-
-  override def credentialsRequired(conf: Configuration): Boolean = true
-
-  override def obtainCredentials(
-      hadoopConf: Configuration,
-      sparkConf: SparkConf,
-      creds: Credentials): Option[Long] = {
-    if (creds == null) {
-      // Guard out other unit test failures.
-      return None
-    }
-
-    val emptyToken = new Token()
-    emptyToken.setService(new Text("test"))
-    creds.addToken(emptyToken.getService, emptyToken)
-
-    val currTime = System.currentTimeMillis()
-    timeOfNextTokenRenewal = (currTime - currTime % tokenRenewalInterval) + 
tokenRenewalInterval
-
-    Some(timeOfNextTokenRenewal)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
deleted file mode 100644
index f50ee19..0000000
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import org.apache.hadoop.conf.Configuration
-import org.scalatest.{Matchers, PrivateMethodTester}
-
-import org.apache.spark.{SparkException, SparkFunSuite}
-
-class HadoopFSCredentialProviderSuite
-    extends SparkFunSuite
-    with PrivateMethodTester
-    with Matchers {
-  private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
-
-  private def getTokenRenewer(
-      fsCredentialProvider: HadoopFSCredentialProvider, conf: Configuration): 
String = {
-    fsCredentialProvider invokePrivate _getTokenRenewer(conf)
-  }
-
-  private var hadoopFsCredentialProvider: HadoopFSCredentialProvider = null
-
-  override def beforeAll() {
-    super.beforeAll()
-
-    if (hadoopFsCredentialProvider == null) {
-      hadoopFsCredentialProvider = new HadoopFSCredentialProvider()
-    }
-  }
-
-  override def afterAll() {
-    if (hadoopFsCredentialProvider != null) {
-      hadoopFsCredentialProvider = null
-    }
-
-    super.afterAll()
-  }
-
-  test("check token renewer") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
-    hadoopConf.set("yarn.resourcemanager.principal", 
"yarn/myrm:8...@sparktest.com")
-    val renewer = getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
-    renewer should be ("yarn/myrm:8...@sparktest.com")
-  }
-
-  test("check token renewer default") {
-    val hadoopConf = new Configuration()
-    val caught =
-      intercept[SparkException] {
-        getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
-      }
-    assert(caught.getMessage === "Can't get Master Kerberos principal for use 
as renewer")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a18d6371/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
new file mode 100644
index 0000000..2b226ef
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
+
+class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with 
Matchers {
+  private var credentialManager: YARNHadoopDelegationTokenManager = null
+  private var sparkConf: SparkConf = null
+  private var hadoopConf: Configuration = null
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    System.setProperty("SPARK_YARN_MODE", "true")
+
+    sparkConf = new SparkConf()
+    hadoopConf = new Configuration()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+
+    System.clearProperty("SPARK_YARN_MODE")
+  }
+
+  test("Correctly loads credential providers") {
+    credentialManager = new YARNHadoopDelegationTokenManager(
+      sparkConf,
+      hadoopConf,
+      YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+
+    credentialManager.credentialProviders.get("yarn-test") should not be (None)
+  }
+}
+
+class YARNTestCredentialProvider extends ServiceCredentialProvider {
+  override def serviceName: String = "yarn-test"
+
+  override def credentialsRequired(conf: Configuration): Boolean = true
+
+  override def obtainCredentials(
+      hadoopConf: Configuration,
+      sparkConf: SparkConf,
+      creds: Credentials): Option[Long] = None
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to