Repository: spark Updated Branches: refs/heads/master 6f1d0dea1 -> dc2714da5
[SPARK-22290][CORE] Avoid creating Hive delegation tokens when not necessary. Hive delegation tokens are only needed when the Spark driver has no access to the kerberos TGT. That happens only in two situations: - when using a proxy user - when using cluster mode without a keytab This change modifies the Hive provider so that it only generates delegation tokens in those situations, and tweaks the YARN AM so that it makes the proper user visible to the Hive code when running with keytabs, so that the TGT can be used instead of a delegation token. The effect of this change is that now it's possible to initialize multiple, non-concurrent SparkContext instances in the same JVM. Before, the second invocation would fail to fetch a new Hive delegation token, which then could make the second (or third or...) application fail once the token expired. With this change, the TGT will be used to authenticate to the HMS instead. This change also avoids polluting the current logged in user's credentials when launching applications. The credentials are copied only when running applications as a proxy user. This makes it possible to implement SPARK-11035 later, where multiple threads might be launching applications, and each app should have its own set of credentials. Tested by verifying HDFS and Hive access in following scenarios: - client and cluster mode - client and cluster mode with proxy user - client and cluster mode with principal / keytab - long-running cluster app with principal / keytab - pyspark app that creates (and stops) multiple SparkContext instances through its lifetime Author: Marcelo Vanzin <van...@cloudera.com> Closes #19509 from vanzin/SPARK-22290. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc2714da Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc2714da Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc2714da Branch: refs/heads/master Commit: dc2714da50ecba1bf1fdf555a82a4314f763a76e Parents: 6f1d0de Author: Marcelo Vanzin <van...@cloudera.com> Authored: Thu Oct 19 14:56:48 2017 +0800 Committer: jerryshao <ss...@hortonworks.com> Committed: Thu Oct 19 14:56:48 2017 +0800 ---------------------------------------------------------------------- .../apache/spark/deploy/SparkHadoopUtil.scala | 17 +++-- .../security/HBaseDelegationTokenProvider.scala | 4 +- .../security/HadoopDelegationTokenManager.scala | 2 +- .../HadoopDelegationTokenProvider.scala | 2 +- .../HadoopFSDelegationTokenProvider.scala | 4 +- .../security/HiveDelegationTokenProvider.scala | 20 +++++- docs/running-on-yarn.md | 9 +++ .../spark/deploy/yarn/ApplicationMaster.scala | 69 ++++++++++++++++---- .../org/apache/spark/deploy/yarn/Client.scala | 5 +- .../org/apache/spark/deploy/yarn/config.scala | 4 ++ .../spark/sql/hive/client/HiveClientImpl.scala | 6 -- 11 files changed, 110 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 53775db..1fa10ab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -61,13 +61,17 @@ class SparkHadoopUtil extends Logging { * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems */ def runAsSparkUser(func: () => Unit) { + createSparkUser().doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() + }) + } + + def createSparkUser(): UserGroupInformation = { val user = Utils.getCurrentUserName() - logDebug("running as user: " + user) + logDebug("creating UGI for user: " + user) val ugi = UserGroupInformation.createRemoteUser(user) transferCredentials(UserGroupInformation.getCurrentUser(), ugi) - ugi.doAs(new PrivilegedExceptionAction[Unit] { - def run: Unit = func() - }) + ugi } def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { @@ -417,6 +421,11 @@ class SparkHadoopUtil extends Logging { creds.readTokenStorageStream(new DataInputStream(tokensBuf)) creds } + + def isProxyUser(ugi: UserGroupInformation): Boolean = { + ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY + } + } object SparkHadoopUtil { http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala index 78b0e6b..5dcde4e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala @@ -56,7 +56,9 @@ private[security] class HBaseDelegationTokenProvider None } - override def delegationTokensRequired(hadoopConf: Configuration): Boolean = { + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos" } http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index c134b7e..483d0de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -115,7 +115,7 @@ private[spark] class HadoopDelegationTokenManager( hadoopConf: Configuration, creds: Credentials): Long = { delegationTokenProviders.values.flatMap { provider => - if (provider.delegationTokensRequired(hadoopConf)) { + if (provider.delegationTokensRequired(sparkConf, hadoopConf)) { provider.obtainDelegationTokens(hadoopConf, sparkConf, creds) } else { logDebug(s"Service ${provider.serviceName} does not require a token." + http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala index 1ba245e..ed09050 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala @@ -37,7 +37,7 @@ private[spark] trait HadoopDelegationTokenProvider { * Returns true if delegation tokens are required for this service. By default, it is based on * whether Hadoop security is enabled. */ - def delegationTokensRequired(hadoopConf: Configuration): Boolean + def delegationTokensRequired(sparkConf: SparkConf, hadoopConf: Configuration): Boolean /** * Obtain delegation tokens for this service and get the time of the next renewal. http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 300773c..21ca669 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -69,7 +69,9 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration nextRenewalDate } - def delegationTokensRequired(hadoopConf: Configuration): Boolean = { + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { UserGroupInformation.isSecurityEnabled } http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala index b31cc59..ece5ce7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala @@ -31,7 +31,9 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.Token import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.KEYTAB import org.apache.spark.util.Utils private[security] class HiveDelegationTokenProvider @@ -55,9 +57,21 @@ private[security] class HiveDelegationTokenProvider } } - override def delegationTokensRequired(hadoopConf: Configuration): Boolean = { + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { + // Delegation tokens are needed only when: + // - trying to connect to a secure metastore + // - either deploying in cluster mode without a keytab, or impersonating another user + // + // Other modes (such as client with or without keytab, or cluster mode with keytab) do not need + // a delegation token, since there's a valid kerberos TGT for the right user available to the + // driver, which is the only process that connects to the HMS. + val deployMode = sparkConf.get("spark.submit.deployMode", "client") UserGroupInformation.isSecurityEnabled && - hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty + hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty && + (SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser()) || + (deployMode == "cluster" && !sparkConf.contains(KEYTAB))) } override def obtainDelegationTokens( @@ -83,7 +97,7 @@ private[security] class HiveDelegationTokenProvider val hive2Token = new Token[DelegationTokenIdentifier]() hive2Token.decodeFromUrlString(tokenStr) - logInfo(s"Get Token from hive metastore: ${hive2Token.toString}") + logDebug(s"Get Token from hive metastore: ${hive2Token.toString}") creds.addToken(new Text("hive.server2.delegation.token"), hive2Token) } http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/docs/running-on-yarn.md ---------------------------------------------------------------------- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 4326395..9599d40 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -402,6 +402,15 @@ To use a custom metrics.properties for the application master and executors, upd </td> </tr> <tr> + <td><code>spark.yarn.kerberos.relogin.period</code></td> + <td>1m</td> + <td> + How often to check whether the kerberos TGT should be renewed. This should be set to a value + that is shorter than the TGT renewal period (or the TGT lifetime if TGT renewal is not enabled). + The default value should be enough for most deployments. + </td> +</tr> +<tr> <td><code>spark.yarn.config.gatewayPath</code></td> <td>(none)</td> <td> http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e227bff..f616723 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn import java.io.{File, IOException} import java.lang.reflect.InvocationTargetException import java.net.{Socket, URI, URL} +import java.security.PrivilegedExceptionAction import java.util.concurrent.{TimeoutException, TimeUnit} import scala.collection.mutable.HashMap @@ -28,6 +29,7 @@ import scala.concurrent.duration.Duration import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -49,10 +51,7 @@ import org.apache.spark.util._ /** * Common application master functionality for Spark on Yarn. */ -private[spark] class ApplicationMaster( - args: ApplicationMasterArguments, - client: YarnRMClient) - extends Logging { +private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging { // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. @@ -62,6 +61,46 @@ private[spark] class ApplicationMaster( .asInstanceOf[YarnConfiguration] private val isClusterMode = args.userClass != null + private val ugi = { + val original = UserGroupInformation.getCurrentUser() + + // If a principal and keytab were provided, log in to kerberos, and set up a thread to + // renew the kerberos ticket when needed. Because the UGI API does not expose the TTL + // of the TGT, use a configuration to define how often to check that a relogin is necessary. + // checkTGTAndReloginFromKeytab() is a no-op if the relogin is not yet needed. + val principal = sparkConf.get(PRINCIPAL).orNull + val keytab = sparkConf.get(KEYTAB).orNull + if (principal != null && keytab != null) { + UserGroupInformation.loginUserFromKeytab(principal, keytab) + + val renewer = new Thread() { + override def run(): Unit = Utils.tryLogNonFatalError { + while (true) { + TimeUnit.SECONDS.sleep(sparkConf.get(KERBEROS_RELOGIN_PERIOD)) + UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab() + } + } + } + renewer.setName("am-kerberos-renewer") + renewer.setDaemon(true) + renewer.start() + + // Transfer the original user's tokens to the new user, since that's needed to connect to + // YARN. It also copies over any delegation tokens that might have been created by the + // client, which will then be transferred over when starting executors (until new ones + // are created by the periodic task). + val newUser = UserGroupInformation.getCurrentUser() + SparkHadoopUtil.get.transferCredentials(original, newUser) + newUser + } else { + SparkHadoopUtil.get.createSparkUser() + } + } + + private val client = ugi.doAs(new PrivilegedExceptionAction[YarnRMClient]() { + def run: YarnRMClient = new YarnRMClient() + }) + // Default to twice the number of executors (twice the maximum number of executors if dynamic // allocation is enabled), with a minimum of 3. @@ -201,6 +240,13 @@ private[spark] class ApplicationMaster( } final def run(): Int = { + ugi.doAs(new PrivilegedExceptionAction[Unit]() { + def run: Unit = runImpl() + }) + exitCode + } + + private def runImpl(): Unit = { try { val appAttemptId = client.getAttemptId() @@ -254,11 +300,6 @@ private[spark] class ApplicationMaster( } } - // Call this to force generation of secret so it gets populated into the - // Hadoop UGI. This has to happen before the startUserApplication which does a - // doAs in order for the credentials to be passed on to the executor containers. - val securityMgr = new SecurityManager(sparkConf) - // If the credentials file config is present, we must periodically renew tokens. So create // a new AMDelegationTokenRenewer if (sparkConf.contains(CREDENTIALS_FILE_PATH)) { @@ -284,6 +325,9 @@ private[spark] class ApplicationMaster( credentialRenewerThread.join() } + // Call this to force generation of secret so it gets populated into the Hadoop UGI. + val securityMgr = new SecurityManager(sparkConf) + if (isClusterMode) { runDriver(securityMgr) } else { @@ -297,7 +341,6 @@ private[spark] class ApplicationMaster( ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, "Uncaught exception: " + e) } - exitCode } /** @@ -775,10 +818,8 @@ object ApplicationMaster extends Logging { sys.props(k) = v } } - SparkHadoopUtil.get.runAsSparkUser { () => - master = new ApplicationMaster(amArgs, new YarnRMClient) - System.exit(master.run()) - } + master = new ApplicationMaster(amArgs) + System.exit(master.run()) } private[spark] def sparkContextInitialized(sc: SparkContext): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 64b2b4d..1fe25c4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -394,7 +394,10 @@ private[spark] class Client( if (credentials != null) { // Add credentials to current user's UGI, so that following operations don't need to use the // Kerberos tgt to get delegations again in the client side. - UserGroupInformation.getCurrentUser.addCredentials(credentials) + val currentUser = UserGroupInformation.getCurrentUser() + if (SparkHadoopUtil.get.isProxyUser(currentUser)) { + currentUser.addCredentials(credentials) + } logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n")) } http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 187803c..e1af8ba 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -347,6 +347,10 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(Long.MaxValue) + private[spark] val KERBEROS_RELOGIN_PERIOD = ConfigBuilder("spark.yarn.kerberos.relogin.period") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("1m") + // The list of cache-related config entries. This is used by Client and the AM to clean // up the environment so that these settings do not appear on the web UI. private[yarn] val CACHE_CONFIGS = Seq( http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index a01c312..16c95c5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -111,12 +111,6 @@ private[hive] class HiveClientImpl( if (clientLoader.isolationOn) { // Switch to the initClassLoader. Thread.currentThread().setContextClassLoader(initClassLoader) - // Set up kerberos credentials for UserGroupInformation.loginUser within current class loader - if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { - val principal = sparkConf.get("spark.yarn.principal") - val keytab = sparkConf.get("spark.yarn.keytab") - SparkHadoopUtil.get.loginUserFromKeytab(principal, keytab) - } try { newState() } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org