Repository: spark
Updated Branches:
  refs/heads/master d3f4a2119 -> cde64add1


[SPARK-21411][YARN] Lazily create FS within kerberized UGI to avoid token 
acquiring failure

## What changes were proposed in this pull request?

In the current `YARNHadoopDelegationTokenManager`, `FileSystem` to which to get 
tokens are created out of KDC logged UGI, using these `FileSystem` to get new 
tokens will lead to exception. The main thing is that Spark code trying to get 
new tokens from the FS created with token auth-ed UGI, but Hadoop can only 
grant new tokens in kerberized UGI. To fix this issue, we should lazily create 
these FileSystem within KDC logged UGI.

## How was this patch tested?

Manual verification in secure cluster.

CC vanzin mgummelt please help to review, thanks!

Author: jerryshao <[email protected]>

Closes #18633 from jerryshao/SPARK-21411.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cde64add
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cde64add
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cde64add

Branch: refs/heads/master
Commit: cde64add18dac712c48de0637f1979f1043e333e
Parents: d3f4a21
Author: jerryshao <[email protected]>
Authored: Tue Jul 18 11:44:01 2017 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Tue Jul 18 11:44:01 2017 -0700

----------------------------------------------------------------------
 .../spark/deploy/security/HadoopDelegationTokenManager.scala | 2 +-
 .../deploy/security/HadoopFSDelegationTokenProvider.scala    | 7 ++++---
 .../deploy/security/HadoopDelegationTokenManagerSuite.scala  | 8 ++++----
 .../org/apache/spark/deploy/yarn/ApplicationMaster.scala     | 2 +-
 .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +-
 .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala   | 2 +-
 .../yarn/security/YARNHadoopDelegationTokenManager.scala     | 2 +-
 .../security/YARNHadoopDelegationTokenManagerSuite.scala     | 2 +-
 8 files changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 89b6f52..01cbfe1 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging
 private[spark] class HadoopDelegationTokenManager(
     sparkConf: SparkConf,
     hadoopConf: Configuration,
-    fileSystems: Set[FileSystem])
+    fileSystems: Configuration => Set[FileSystem])
   extends Logging {
 
   private val deprecatedProviderEnabledConfigs = List(

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index 13157f3..f0ac7f5 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -29,7 +29,7 @@ import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
 import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 
-private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: 
Set[FileSystem])
+private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: 
Configuration => Set[FileSystem])
     extends HadoopDelegationTokenProvider with Logging {
 
   // This tokenRenewalInterval will be set in the first call to 
obtainDelegationTokens.
@@ -43,13 +43,14 @@ private[deploy] class 
HadoopFSDelegationTokenProvider(fileSystems: Set[FileSyste
       hadoopConf: Configuration,
       creds: Credentials): Option[Long] = {
 
+    val fsToGetTokens = fileSystems(hadoopConf)
     val newCreds = fetchDelegationTokens(
       getTokenRenewer(hadoopConf),
-      fileSystems)
+      fsToGetTokens)
 
     // Get the token renewal interval if it is not set. It will only be called 
once.
     if (tokenRenewalInterval == null) {
-      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems)
+      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens)
     }
 
     // Get the time of next renewal.

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index 335f344..5b05521 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -40,7 +40,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite 
with Matchers {
     delegationTokenManager = new HadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      hadoopFSsToAccess(hadoopConf))
+      hadoopFSsToAccess)
 
     delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") 
should not be (None)
     delegationTokenManager.getServiceDelegationTokenProvider("hbase") should 
not be (None)
@@ -53,7 +53,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite 
with Matchers {
     delegationTokenManager = new HadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      hadoopFSsToAccess(hadoopConf))
+      hadoopFSsToAccess)
 
     delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") 
should not be (None)
     delegationTokenManager.getServiceDelegationTokenProvider("hbase") should 
not be (None)
@@ -66,7 +66,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite 
with Matchers {
     delegationTokenManager = new HadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      hadoopFSsToAccess(hadoopConf))
+      hadoopFSsToAccess)
 
     delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") 
should be (None)
     delegationTokenManager.getServiceDelegationTokenProvider("hive") should be 
(None)
@@ -77,7 +77,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite 
with Matchers {
     delegationTokenManager = new HadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      hadoopFSsToAccess(hadoopConf))
+      hadoopFSsToAccess)
     val creds = new Credentials()
 
     // Tokens cannot be obtained from HDFS, Hive, HBase in unit tests.

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index ce290c3..6ff210a 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -270,7 +270,7 @@ private[spark] class ApplicationMaster(
             val credentialManager = new YARNHadoopDelegationTokenManager(
               sparkConf,
               yarnConf,
-              YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))
+              conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, 
conf))
 
             val credentialRenewer =
               new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 3a7adb7..d408ca9 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -125,7 +125,7 @@ private[spark] class Client(
   private val credentialManager = new YARNHadoopDelegationTokenManager(
     sparkConf,
     hadoopConf,
-    YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+    conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
 
   def reportLauncherState(state: SparkAppHandle.State): Unit = {
     launcherBackend.setState(state)

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index a687f67..4fef439 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -98,7 +98,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     val credentialManager = new YARNHadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+      conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
     credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, 
credentialManager)
     credentialUpdater.start()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
index bbd17c8..163cfb4 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
@@ -38,7 +38,7 @@ import org.apache.spark.util.Utils
 private[yarn] class YARNHadoopDelegationTokenManager(
     sparkConf: SparkConf,
     hadoopConf: Configuration,
-    fileSystems: Set[FileSystem]) extends Logging {
+    fileSystems: Configuration => Set[FileSystem]) extends Logging {
 
   private val delegationTokenManager =
     new HadoopDelegationTokenManager(sparkConf, hadoopConf, fileSystems)

http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
index 2b226ef..c918998 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
@@ -48,7 +48,7 @@ class YARNHadoopDelegationTokenManagerSuite extends 
SparkFunSuite with Matchers
     credentialManager = new YARNHadoopDelegationTokenManager(
       sparkConf,
       hadoopConf,
-      YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+      conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf))
 
     credentialManager.credentialProviders.get("yarn-test") should not be (None)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to