Repository: spark
Updated Branches:
  refs/heads/master a61551356 -> 4239a1081


[SPARK-19021][YARN] Generailize HDFSCredentialProvider to support non HDFS 
security filesystems

Currently Spark can only get token renewal interval from security HDFS 
(hdfs://), if Spark runs with other security file systems like webHDFS 
(webhdfs://), wasb (wasb://), ADLS, it will ignore these tokens and not get 
token renewal intervals from these tokens. These will make Spark unable to work 
with these security clusters. So instead of only checking HDFS token, we should 
generalize to support different DelegationTokenIdentifier.

## How was this patch tested?

Manually verified in security cluster.

Author: jerryshao <ss...@hortonworks.com>

Closes #16432 from jerryshao/SPARK-19021.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4239a108
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4239a108
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4239a108

Branch: refs/heads/master
Commit: 4239a1081ad96a503fbf9277e42b97422bb8af3e
Parents: a615513
Author: jerryshao <ss...@hortonworks.com>
Authored: Wed Jan 11 09:24:02 2017 -0600
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Wed Jan 11 09:24:02 2017 -0600

----------------------------------------------------------------------
 docs/running-on-yarn.md                         |  12 +-
 ...ploy.yarn.security.ServiceCredentialProvider |   2 +-
 .../ConfigurableCredentialManager.scala         |   2 +-
 .../yarn/security/HDFSCredentialProvider.scala  | 111 -----------------
 .../security/HadoopFSCredentialProvider.scala   | 120 +++++++++++++++++++
 .../ConfigurableCredentialManagerSuite.scala    |   8 +-
 .../security/HDFSCredentialProviderSuite.scala  |  71 -----------
 .../HadoopFSCredentialProviderSuite.scala       |  71 +++++++++++
 8 files changed, 203 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4239a108/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index a072975..f751345 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -479,12 +479,12 @@ Hadoop services issue *hadoop tokens* to grant access to 
the services and data.
 Clients must first acquire tokens for the services they will access and pass 
them along with their
 application as it is launched in the YARN cluster.
 
-For a Spark application to interact with HDFS, HBase and Hive, it must acquire 
the relevant tokens
+For a Spark application to interact with any of the Hadoop filesystem (for 
example hdfs, webhdfs, etc), HBase and Hive, it must acquire the relevant tokens
 using the Kerberos credentials of the user launching the application
 —that is, the principal whose identity will become that of the launched 
Spark application.
 
 This is normally done at launch time: in a secure cluster Spark will 
automatically obtain a
-token for the cluster's HDFS filesystem, and potentially for HBase and Hive.
+token for the cluster's default Hadoop filesystem, and potentially for HBase 
and Hive.
 
 An HBase token will be obtained if HBase is in on classpath, the HBase 
configuration declares
 the application is secure (i.e. `hbase-site.xml` sets 
`hbase.security.authentication` to `kerberos`),
@@ -494,12 +494,12 @@ Similarly, a Hive token will be obtained if Hive is on 
the classpath, its config
 includes a URI of the metadata store in `"hive.metastore.uris`, and
 `spark.yarn.security.credentials.hive.enabled` is not set to `false`.
 
-If an application needs to interact with other secure HDFS clusters, then
+If an application needs to interact with other secure Hadoop filesystems, then
 the tokens needed to access these clusters must be explicitly requested at
 launch time. This is done by listing them in the `spark.yarn.access.namenodes` 
property.
 
 ```
-spark.yarn.access.namenodes 
hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/
+spark.yarn.access.namenodes 
hdfs://ireland.example.org:8020/,webhdfs://frankfurt.example.org:50070/
 ```
 
 Spark supports integrating with other security-aware services through Java 
Services mechanism (see
@@ -558,8 +558,8 @@ For Spark applications, the Oozie workflow must be set up 
for Oozie to request a
 the application needs, including:
 
 - The YARN resource manager.
-- The local HDFS filesystem.
-- Any remote HDFS filesystems used as a source or destination of I/O.
+- The local Hadoop filesystem.
+- Any remote Hadoop filesystems used as a source or destination of I/O.
 - Hive —if used.
 - HBase —if used.
 - The YARN timeline server, if the application interacts with this.

http://git-wip-us.apache.org/repos/asf/spark/blob/4239a108/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
 
b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
index 22ead56..f5a807e 100644
--- 
a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
+++ 
b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -1,3 +1,3 @@
-org.apache.spark.deploy.yarn.security.HDFSCredentialProvider
+org.apache.spark.deploy.yarn.security.HadoopFSCredentialProvider
 org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
 org.apache.spark.deploy.yarn.security.HiveCredentialProvider

http://git-wip-us.apache.org/repos/asf/spark/blob/4239a108/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
index 933736b..4f4be52 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
 /**
  * A ConfigurableCredentialManager to manage all the registered credential 
providers and offer
  * APIs for other modules to obtain credentials as well as renewal time. By 
default
- * [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and 
[[HBaseCredentialProvider]] will
+ * [[HadoopFSCredentialProvider]], [[HiveCredentialProvider]] and 
[[HBaseCredentialProvider]] will
  * be loaded in if not explicitly disabled, any plugged-in credential provider 
wants to be
  * managed by ConfigurableCredentialManager needs to implement 
[[ServiceCredentialProvider]]
  * interface and put into resources/META-INF/services to be loaded by 
ServiceLoader.

http://git-wip-us.apache.org/repos/asf/spark/blob/4239a108/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
deleted file mode 100644
index ebb176b..0000000
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.io.{ByteArrayInputStream, DataInputStream}
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
-import org.apache.hadoop.mapred.Master
-import org.apache.hadoop.security.Credentials
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-
-private[security] class HDFSCredentialProvider extends 
ServiceCredentialProvider with Logging {
-  // Token renewal interval, this value will be set in the first call,
-  // if None means no token renewer specified, so cannot get token renewal 
interval.
-  private var tokenRenewalInterval: Option[Long] = null
-
-  override val serviceName: String = "hdfs"
-
-  override def obtainCredentials(
-      hadoopConf: Configuration,
-      sparkConf: SparkConf,
-      creds: Credentials): Option[Long] = {
-    // NameNode to access, used to get tokens from different FileSystems
-    nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
-      val dstFs = dst.getFileSystem(hadoopConf)
-      logInfo("getting token for namenode: " + dst)
-      dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds)
-    }
-
-    // Get the token renewal interval if it is not set. It will only be called 
once.
-    if (tokenRenewalInterval == null) {
-      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf)
-    }
-
-    // Get the time of next renewal.
-    tokenRenewalInterval.map { interval =>
-      creds.getAllTokens.asScala
-        .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
-        .map { t =>
-          val identifier = new DelegationTokenIdentifier()
-          identifier.readFields(new DataInputStream(new 
ByteArrayInputStream(t.getIdentifier)))
-          identifier.getIssueDate + interval
-      }.foldLeft(0L)(math.max)
-    }
-  }
-
-  private def getTokenRenewalInterval(
-      hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = {
-    // We cannot use the tokens generated with renewer yarn. Trying to renew
-    // those will fail with an access control issue. So create new tokens with 
the logged in
-    // user as renewer.
-    sparkConf.get(PRINCIPAL).flatMap { renewer =>
-      val creds = new Credentials()
-      nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
-        val dstFs = dst.getFileSystem(hadoopConf)
-        dstFs.addDelegationTokens(renewer, creds)
-      }
-      val hdfsToken = creds.getAllTokens.asScala
-        .find(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
-      hdfsToken.map { t =>
-        val newExpiration = t.renew(hadoopConf)
-        val identifier = new DelegationTokenIdentifier()
-        identifier.readFields(new DataInputStream(new 
ByteArrayInputStream(t.getIdentifier)))
-        val interval = newExpiration - identifier.getIssueDate
-        logInfo(s"Renewal Interval is $interval")
-        interval
-      }
-    }
-  }
-
-  private def getTokenRenewer(conf: Configuration): String = {
-    val delegTokenRenewer = Master.getMasterPrincipal(conf)
-    logDebug("delegation token renewer is: " + delegTokenRenewer)
-    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-      val errorMessage = "Can't get Master Kerberos principal for use as 
renewer"
-      logError(errorMessage)
-      throw new SparkException(errorMessage)
-    }
-
-    delegTokenRenewer
-  }
-
-  private def nnsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): 
Set[Path] = {
-    sparkConf.get(NAMENODES_TO_ACCESS).map(new Path(_)).toSet +
-      sparkConf.get(STAGING_DIR).map(new Path(_))
-        .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4239a108/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
new file mode 100644
index 0000000..b4fb4a7
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.mapred.Master
+import org.apache.hadoop.security.Credentials
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[security] class HadoopFSCredentialProvider
+    extends ServiceCredentialProvider with Logging {
+  // Token renewal interval, this value will be set in the first call,
+  // if None means no token renewer specified or no token can be renewed,
+  // so cannot get token renewal interval.
+  private var tokenRenewalInterval: Option[Long] = null
+
+  override val serviceName: String = "hadoopfs"
+
+  override def obtainCredentials(
+      hadoopConf: Configuration,
+      sparkConf: SparkConf,
+      creds: Credentials): Option[Long] = {
+    // NameNode to access, used to get tokens from different FileSystems
+    val tmpCreds = new Credentials()
+    val tokenRenewer = getTokenRenewer(hadoopConf)
+    nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
+      val dstFs = dst.getFileSystem(hadoopConf)
+      logInfo("getting token for: " + dst)
+      dstFs.addDelegationTokens(tokenRenewer, tmpCreds)
+    }
+
+    // Get the token renewal interval if it is not set. It will only be called 
once.
+    if (tokenRenewalInterval == null) {
+      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf)
+    }
+
+    // Get the time of next renewal.
+    val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
+      val nextRenewalDates = tmpCreds.getAllTokens.asScala
+        
.filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
+        .map { t =>
+          val identifier = 
t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
+          identifier.getIssueDate + interval
+        }
+      if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
+    }
+
+    creds.addAll(tmpCreds)
+    nextRenewalDate
+  }
+
+  private def getTokenRenewalInterval(
+      hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = {
+    // We cannot use the tokens generated with renewer yarn. Trying to renew
+    // those will fail with an access control issue. So create new tokens with 
the logged in
+    // user as renewer.
+    sparkConf.get(PRINCIPAL).flatMap { renewer =>
+      val creds = new Credentials()
+      nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
+        val dstFs = dst.getFileSystem(hadoopConf)
+        dstFs.addDelegationTokens(renewer, creds)
+      }
+
+      val renewIntervals = creds.getAllTokens.asScala.filter {
+        _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
+      }.flatMap { token =>
+        Try {
+          val newExpiration = token.renew(hadoopConf)
+          val identifier = 
token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
+          val interval = newExpiration - identifier.getIssueDate
+          logInfo(s"Renewal interval is $interval for token 
${token.getKind.toString}")
+          interval
+        }.toOption
+      }
+      if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
+    }
+  }
+
+  private def getTokenRenewer(conf: Configuration): String = {
+    val delegTokenRenewer = Master.getMasterPrincipal(conf)
+    logDebug("delegation token renewer is: " + delegTokenRenewer)
+    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+      val errorMessage = "Can't get Master Kerberos principal for use as 
renewer"
+      logError(errorMessage)
+      throw new SparkException(errorMessage)
+    }
+
+    delegTokenRenewer
+  }
+
+  private def nnsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): 
Set[Path] = {
+    sparkConf.get(NAMENODES_TO_ACCESS).map(new Path(_)).toSet +
+      sparkConf.get(STAGING_DIR).map(new Path(_))
+        .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4239a108/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
index db4619e..b0067aa 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
@@ -48,7 +48,7 @@ class ConfigurableCredentialManagerSuite extends 
SparkFunSuite with Matchers wit
   test("Correctly load default credential providers") {
     credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
 
-    credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+    credentialManager.getServiceCredentialProvider("hadoopfs") should not be 
(None)
     credentialManager.getServiceCredentialProvider("hbase") should not be 
(None)
     credentialManager.getServiceCredentialProvider("hive") should not be (None)
   }
@@ -57,17 +57,17 @@ class ConfigurableCredentialManagerSuite extends 
SparkFunSuite with Matchers wit
     sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
     credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
 
-    credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+    credentialManager.getServiceCredentialProvider("hadoopfs") should not be 
(None)
     credentialManager.getServiceCredentialProvider("hbase") should not be 
(None)
     credentialManager.getServiceCredentialProvider("hive") should be (None)
   }
 
   test("using deprecated configurations") {
-    sparkConf.set("spark.yarn.security.tokens.hdfs.enabled", "false")
+    sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
     sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
     credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
 
-    credentialManager.getServiceCredentialProvider("hdfs") should be (None)
+    credentialManager.getServiceCredentialProvider("hadoopfs") should be (None)
     credentialManager.getServiceCredentialProvider("hive") should be (None)
     credentialManager.getServiceCredentialProvider("test") should not be (None)
     credentialManager.getServiceCredentialProvider("hbase") should not be 
(None)

http://git-wip-us.apache.org/repos/asf/spark/blob/4239a108/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
deleted file mode 100644
index 7b2da3f..0000000
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.scalatest.{Matchers, PrivateMethodTester}
-
-import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
-
-class HDFSCredentialProviderSuite
-    extends SparkFunSuite
-    with PrivateMethodTester
-    with Matchers {
-  private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
-
-  private def getTokenRenewer(
-      hdfsCredentialProvider: HDFSCredentialProvider, conf: Configuration): 
String = {
-    hdfsCredentialProvider invokePrivate _getTokenRenewer(conf)
-  }
-
-  private var hdfsCredentialProvider: HDFSCredentialProvider = null
-
-  override def beforeAll() {
-    super.beforeAll()
-
-    if (hdfsCredentialProvider == null) {
-      hdfsCredentialProvider = new HDFSCredentialProvider()
-    }
-  }
-
-  override def afterAll() {
-    if (hdfsCredentialProvider != null) {
-      hdfsCredentialProvider = null
-    }
-
-    super.afterAll()
-  }
-
-  test("check token renewer") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
-    hadoopConf.set("yarn.resourcemanager.principal", 
"yarn/myrm:8...@sparktest.com")
-    val renewer = getTokenRenewer(hdfsCredentialProvider, hadoopConf)
-    renewer should be ("yarn/myrm:8...@sparktest.com")
-  }
-
-  test("check token renewer default") {
-    val hadoopConf = new Configuration()
-    val caught =
-      intercept[SparkException] {
-        getTokenRenewer(hdfsCredentialProvider, hadoopConf)
-      }
-    assert(caught.getMessage === "Can't get Master Kerberos principal for use 
as renewer")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4239a108/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
new file mode 100644
index 0000000..0eb2512
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.{Matchers, PrivateMethodTester}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+
+class HadoopFSCredentialProviderSuite
+    extends SparkFunSuite
+    with PrivateMethodTester
+    with Matchers {
+  private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
+
+  private def getTokenRenewer(
+      fsCredentialProvider: HadoopFSCredentialProvider, conf: Configuration): 
String = {
+    fsCredentialProvider invokePrivate _getTokenRenewer(conf)
+  }
+
+  private var hadoopFsCredentialProvider: HadoopFSCredentialProvider = null
+
+  override def beforeAll() {
+    super.beforeAll()
+
+    if (hadoopFsCredentialProvider == null) {
+      hadoopFsCredentialProvider = new HadoopFSCredentialProvider()
+    }
+  }
+
+  override def afterAll() {
+    if (hadoopFsCredentialProvider != null) {
+      hadoopFsCredentialProvider = null
+    }
+
+    super.afterAll()
+  }
+
+  test("check token renewer") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+    hadoopConf.set("yarn.resourcemanager.principal", 
"yarn/myrm:8...@sparktest.com")
+    val renewer = getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
+    renewer should be ("yarn/myrm:8...@sparktest.com")
+  }
+
+  test("check token renewer default") {
+    val hadoopConf = new Configuration()
+    val caught =
+      intercept[SparkException] {
+        getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
+      }
+    assert(caught.getMessage === "Can't get Master Kerberos principal for use 
as renewer")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to