http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala new file mode 100644 index 0000000..7e76f40 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.yarn.security + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.ThreadUtils + +/** + * The following methods are primarily meant to make sure long-running apps like Spark + * Streaming apps can run without interruption while accessing secured services. The + * scheduleLoginFromKeytab method is called on the AM to get the new credentials. + * This method wakes up a thread that logs into the KDC + * once 75% of the renewal interval of the original credentials used for the container + * has elapsed. It then obtains new credentials and writes them to HDFS in a + * pre-specified location - the prefix of which is specified in the sparkConf by + * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc. + * - each update goes to a new file, with a monotonically increasing suffix), also the + * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater. + * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed. + * + * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is + * called once 80% of the validity of the original credentials has elapsed. At that time the + * executor finds the credentials file with the latest timestamp and checks if it has read those + * credentials before (by keeping track of the suffix of the last file it read). If a new file has + * appeared, it will read the credentials and update the currently running UGI with it. This + * process happens again once 80% of the validity of this has expired. + */ +private[yarn] class AMCredentialRenewer( + sparkConf: SparkConf, + hadoopConf: Configuration, + credentialManager: ConfigurableCredentialManager) extends Logging { + + private var lastCredentialsFileSuffix = 0 + + private val credentialRenewer = + Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + private val hadoopUtil = YarnSparkHadoopUtil.get + + private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) + private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION) + private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT) + private val freshHadoopConf = + hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme) + + @volatile private var timeOfNextRenewal = sparkConf.get(CREDENTIALS_RENEWAL_TIME) + + /** + * Schedule a login from the keytab and principal set using the --principal and --keytab + * arguments to spark-submit. This login happens only when the credentials of the current user + * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from + * SparkConf to do the login. This method is a no-op in non-YARN mode. + * + */ + private[spark] def scheduleLoginFromKeytab(): Unit = { + val principal = sparkConf.get(PRINCIPAL).get + val keytab = sparkConf.get(KEYTAB).get + + /** + * Schedule re-login and creation of new credentials. If credentials have already expired, this + * method will synchronously create new ones. + */ + def scheduleRenewal(runnable: Runnable): Unit = { + // Run now! + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { + logInfo("Credentials have expired, creating new ones now.") + runnable.run() + } else { + logInfo(s"Scheduling login from keytab in $remainingTime millis.") + credentialRenewer.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } + } + + // This thread periodically runs on the AM to update the credentials on HDFS. + val credentialRenewerRunnable = + new Runnable { + override def run(): Unit = { + try { + writeNewCredentialsToHDFS(principal, keytab) + cleanupOldFiles() + } catch { + case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Failed to write out new credentials to HDFS, will try again in an " + + "hour! If this happens too often tasks will fail.", e) + credentialRenewer.schedule(this, 1, TimeUnit.HOURS) + return + } + scheduleRenewal(this) + } + } + // Schedule update of credentials. This handles the case of updating the credentials right now + // as well, since the renewal interval will be 0, and the thread will get scheduled + // immediately. + scheduleRenewal(credentialRenewerRunnable) + } + + // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At + // least numFilesToKeep files are kept for safety + private def cleanupOldFiles(): Unit = { + import scala.concurrent.duration._ + try { + val remoteFs = FileSystem.get(freshHadoopConf) + val credentialsPath = new Path(credentialsFile) + val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles.days).toMillis + hadoopUtil.listFilesSorted( + remoteFs, credentialsPath.getParent, + credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) + .dropRight(numFilesToKeep) + .takeWhile(_.getModificationTime < thresholdTime) + .foreach(x => remoteFs.delete(x.getPath, true)) + } catch { + // Such errors are not fatal, so don't throw. Make sure they are logged though + case e: Exception => + logWarning("Error while attempting to cleanup old credentials. If you are seeing many " + + "such warnings there may be an issue with your HDFS cluster.", e) + } + } + + private def writeNewCredentialsToHDFS(principal: String, keytab: String): Unit = { + // Keytab is copied by YARN to the working directory of the AM, so full path is + // not needed. + + // HACK: + // HDFS will not issue new delegation tokens, if the Credentials object + // passed in already has tokens for that FS even if the tokens are expired (it really only + // checks if there are tokens for the service, and not if they are valid). So the only real + // way to get new tokens is to make sure a different Credentials object is used each time to + // get new tokens and then the new tokens are copied over the current user's Credentials. + // So: + // - we login as a different user and get the UGI + // - use that UGI to get the tokens (see doAs block below) + // - copy the tokens over to the current user's credentials (this will overwrite the tokens + // in the current user's Credentials object for this FS). + // The login to KDC happens each time new tokens are required, but this is rare enough to not + // have to worry about (like once every day or so). This makes this code clearer than having + // to login and then relogin every time (the HDFS API may not relogin since we don't use this + // UGI directly for HDFS communication. + logInfo(s"Attempting to login to KDC using principal: $principal") + val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) + logInfo("Successfully logged into KDC.") + val tempCreds = keytabLoggedInUGI.getCredentials + val credentialsPath = new Path(credentialsFile) + val dst = credentialsPath.getParent + var nearestNextRenewalTime = Long.MaxValue + keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] { + // Get a copy of the credentials + override def run(): Void = { + nearestNextRenewalTime = credentialManager.obtainCredentials(freshHadoopConf, tempCreds) + null + } + }) + + val currTime = System.currentTimeMillis() + val timeOfNextUpdate = if (nearestNextRenewalTime <= currTime) { + // If next renewal time is earlier than current time, we set next renewal time to current + // time, this will trigger next renewal immediately. Also set next update time to current + // time. There still has a gap between token renewal and update will potentially introduce + // issue. + logWarning(s"Next credential renewal time ($nearestNextRenewalTime) is earlier than " + + s"current time ($currTime), which is unexpected, please check your credential renewal " + + "related configurations in the target services.") + timeOfNextRenewal = currTime + currTime + } else { + // Next valid renewal time is about 75% of credential renewal time, and update time is + // slightly later than valid renewal time (80% of renewal time). + timeOfNextRenewal = ((nearestNextRenewalTime - currTime) * 0.75 + currTime).toLong + ((nearestNextRenewalTime - currTime) * 0.8 + currTime).toLong + } + + // Add the temp credentials back to the original ones. + UserGroupInformation.getCurrentUser.addCredentials(tempCreds) + val remoteFs = FileSystem.get(freshHadoopConf) + // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM + // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file + // and update the lastCredentialsFileSuffix. + if (lastCredentialsFileSuffix == 0) { + hadoopUtil.listFilesSorted( + remoteFs, credentialsPath.getParent, + credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) + .lastOption.foreach { status => + lastCredentialsFileSuffix = hadoopUtil.getSuffixForCredentialsPath(status.getPath) + } + } + val nextSuffix = lastCredentialsFileSuffix + 1 + + val tokenPathStr = + credentialsFile + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + + timeOfNextUpdate.toLong.toString + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + + nextSuffix + val tokenPath = new Path(tokenPathStr) + val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) + + logInfo("Writing out delegation tokens to " + tempTokenPath.toString) + val credentials = UserGroupInformation.getCurrentUser.getCredentials + credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf) + logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr") + remoteFs.rename(tempTokenPath, tokenPath) + logInfo("Delegation token file rename complete.") + lastCredentialsFileSuffix = nextSuffix + } + + def stop(): Unit = { + credentialRenewer.shutdown() + } +}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala new file mode 100644 index 0000000..c4c07b4 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn.security + +import java.util.ServiceLoader + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * A ConfigurableCredentialManager to manage all the registered credential providers and offer + * APIs for other modules to obtain credentials as well as renewal time. By default + * [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will + * be loaded in if not explicitly disabled, any plugged-in credential provider wants to be + * managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]] + * interface and put into resources/META-INF/services to be loaded by ServiceLoader. + * + * Also each credential provider is controlled by + * spark.yarn.security.credentials.{service}.enabled, it will not be loaded in if set to false. + */ +private[yarn] final class ConfigurableCredentialManager( + sparkConf: SparkConf, hadoopConf: Configuration) extends Logging { + private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled" + private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled" + + // Maintain all the registered credential providers + private val credentialProviders = { + val providers = ServiceLoader.load(classOf[ServiceCredentialProvider], + Utils.getContextOrSparkClassLoader).asScala + + // Filter out credentials in which spark.yarn.security.credentials.{service}.enabled is false. + providers.filter { p => + sparkConf.getOption(providerEnabledConfig.format(p.serviceName)) + .orElse { + sparkConf.getOption(deprecatedProviderEnabledConfig.format(p.serviceName)).map { c => + logWarning(s"${deprecatedProviderEnabledConfig.format(p.serviceName)} is deprecated, " + + s"using ${providerEnabledConfig.format(p.serviceName)} instead") + c + } + }.map(_.toBoolean).getOrElse(true) + }.map { p => (p.serviceName, p) }.toMap + } + + /** + * Get credential provider for the specified service. + */ + def getServiceCredentialProvider(service: String): Option[ServiceCredentialProvider] = { + credentialProviders.get(service) + } + + /** + * Obtain credentials from all the registered providers. + * @return nearest time of next renewal, Long.MaxValue if all the credentials aren't renewable, + * otherwise the nearest renewal time of any credentials will be returned. + */ + def obtainCredentials(hadoopConf: Configuration, creds: Credentials): Long = { + credentialProviders.values.flatMap { provider => + if (provider.credentialsRequired(hadoopConf)) { + provider.obtainCredentials(hadoopConf, sparkConf, creds) + } else { + logDebug(s"Service ${provider.serviceName} does not require a token." + + s" Check your configuration to see if security is disabled or not.") + None + } + }.foldLeft(Long.MaxValue)(math.min) + } + + /** + * Create an [[AMCredentialRenewer]] instance, caller should be responsible to stop this + * instance when it is not used. AM will use it to renew credentials periodically. + */ + def credentialRenewer(): AMCredentialRenewer = { + new AMCredentialRenewer(sparkConf, hadoopConf, this) + } + + /** + * Create an [[CredentialUpdater]] instance, caller should be resposible to stop this intance + * when it is not used. Executors and driver (client mode) will use it to update credentials. + * periodically. + */ + def credentialUpdater(): CredentialUpdater = { + new CredentialUpdater(sparkConf, hadoopConf, this) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala new file mode 100644 index 0000000..5df4fbd --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn.security + +import java.util.concurrent.{Executors, TimeUnit} + +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.security.{Credentials, UserGroupInformation} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.Logging +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class CredentialUpdater( + sparkConf: SparkConf, + hadoopConf: Configuration, + credentialManager: ConfigurableCredentialManager) extends Logging { + + @volatile private var lastCredentialsFileSuffix = 0 + + private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) + private val freshHadoopConf = + SparkHadoopUtil.get.getConfBypassingFSCache( + hadoopConf, new Path(credentialsFile).toUri.getScheme) + + private val credentialUpdater = + Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + // This thread wakes up and picks up new credentials from HDFS, if any. + private val credentialUpdaterRunnable = + new Runnable { + override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired()) + } + + /** Start the credential updater task */ + def start(): Unit = { + val startTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME) + val remainingTime = startTime - System.currentTimeMillis() + if (remainingTime <= 0) { + credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.MINUTES) + } else { + logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime millis.") + credentialUpdater.schedule(credentialUpdaterRunnable, remainingTime, TimeUnit.MILLISECONDS) + } + } + + private def updateCredentialsIfRequired(): Unit = { + val timeToNextUpdate = try { + val credentialsFilePath = new Path(credentialsFile) + val remoteFs = FileSystem.get(freshHadoopConf) + SparkHadoopUtil.get.listFilesSorted( + remoteFs, credentialsFilePath.getParent, + credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) + .lastOption.map { credentialsStatus => + val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath) + if (suffix > lastCredentialsFileSuffix) { + logInfo("Reading new credentials from " + credentialsStatus.getPath) + val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath) + lastCredentialsFileSuffix = suffix + UserGroupInformation.getCurrentUser.addCredentials(newCredentials) + logInfo("Credentials updated from credentials file.") + + val remainingTime = getTimeOfNextUpdateFromFileName(credentialsStatus.getPath) + - System.currentTimeMillis() + if (remainingTime <= 0) TimeUnit.MINUTES.toMillis(1) else remainingTime + } else { + // If current credential file is older than expected, sleep 1 hour and check again. + TimeUnit.HOURS.toMillis(1) + } + }.getOrElse { + // Wait for 1 minute to check again if there's no credential file currently + TimeUnit.MINUTES.toMillis(1) + } + } catch { + // Since the file may get deleted while we are reading it, catch the Exception and come + // back in an hour to try again + case NonFatal(e) => + logWarning("Error while trying to update credentials, will try again in 1 hour", e) + TimeUnit.HOURS.toMillis(1) + } + + credentialUpdater.schedule( + credentialUpdaterRunnable, timeToNextUpdate, TimeUnit.MILLISECONDS) + } + + private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = { + val stream = remoteFs.open(tokenPath) + try { + val newCredentials = new Credentials() + newCredentials.readTokenStorageStream(stream) + newCredentials + } finally { + stream.close() + } + } + + private def getTimeOfNextUpdateFromFileName(credentialsPath: Path): Long = { + val name = credentialsPath.getName + val index = name.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + val slice = name.substring(0, index) + val last2index = slice.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + name.substring(last2index + 1, index).toLong + } + + def stop(): Unit = { + credentialUpdater.shutdown() + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala new file mode 100644 index 0000000..5571df0 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + +private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging { + + override def serviceName: String = "hbase" + + override def obtainCredentials( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { + try { + val mirror = universe.runtimeMirror(getClass.getClassLoader) + val obtainToken = mirror.classLoader. + loadClass("org.apache.hadoop.hbase.security.token.TokenUtil"). + getMethod("obtainToken", classOf[Configuration]) + + logDebug("Attempting to fetch HBase security token.") + val token = obtainToken.invoke(null, hbaseConf(hadoopConf)) + .asInstanceOf[Token[_ <: TokenIdentifier]] + logInfo(s"Get token from HBase: ${token.toString}") + creds.addToken(token.getService, token) + } catch { + case NonFatal(e) => + logDebug(s"Failed to get token from service $serviceName", e) + } + + None + } + + override def credentialsRequired(hadoopConf: Configuration): Boolean = { + hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos" + } + + private def hbaseConf(conf: Configuration): Configuration = { + try { + val mirror = universe.runtimeMirror(getClass.getClassLoader) + val confCreate = mirror.classLoader. + loadClass("org.apache.hadoop.hbase.HBaseConfiguration"). + getMethod("create", classOf[Configuration]) + confCreate.invoke(null, conf).asInstanceOf[Configuration] + } catch { + case NonFatal(e) => + logDebug("Fail to invoke HBaseConfiguration", e) + conf + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala new file mode 100644 index 0000000..8d06d73 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn.security + +import java.io.{ByteArrayInputStream, DataInputStream} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier +import org.apache.hadoop.mapred.Master +import org.apache.hadoop.security.Credentials + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[security] class HDFSCredentialProvider extends ServiceCredentialProvider with Logging { + // Token renewal interval, this value will be set in the first call, + // if None means no token renewer specified, so cannot get token renewal interval. + private var tokenRenewalInterval: Option[Long] = null + + override val serviceName: String = "hdfs" + + override def obtainCredentials( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { + // NameNode to access, used to get tokens from different FileSystems + nnsToAccess(hadoopConf, sparkConf).foreach { dst => + val dstFs = dst.getFileSystem(hadoopConf) + logInfo("getting token for namenode: " + dst) + dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds) + } + + // Get the token renewal interval if it is not set. It will only be called once. + if (tokenRenewalInterval == null) { + tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf) + } + + // Get the time of next renewal. + tokenRenewalInterval.map { interval => + creds.getAllTokens.asScala + .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) + .map { t => + val identifier = new DelegationTokenIdentifier() + identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) + identifier.getIssueDate + interval + }.foldLeft(0L)(math.max) + } + } + + private def getTokenRenewalInterval( + hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = { + // We cannot use the tokens generated with renewer yarn. Trying to renew + // those will fail with an access control issue. So create new tokens with the logged in + // user as renewer. + sparkConf.get(PRINCIPAL).map { renewer => + val creds = new Credentials() + nnsToAccess(hadoopConf, sparkConf).foreach { dst => + val dstFs = dst.getFileSystem(hadoopConf) + dstFs.addDelegationTokens(renewer, creds) + } + val t = creds.getAllTokens.asScala + .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) + .head + val newExpiration = t.renew(hadoopConf) + val identifier = new DelegationTokenIdentifier() + identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) + val interval = newExpiration - identifier.getIssueDate + logInfo(s"Renewal Interval is $interval") + interval + } + } + + private def getTokenRenewer(conf: Configuration): String = { + val delegTokenRenewer = Master.getMasterPrincipal(conf) + logDebug("delegation token renewer is: " + delegTokenRenewer) + if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { + val errorMessage = "Can't get Master Kerberos principal for use as renewer" + logError(errorMessage) + throw new SparkException(errorMessage) + } + + delegTokenRenewer + } + + private def nnsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = { + sparkConf.get(NAMENODES_TO_ACCESS).map(new Path(_)).toSet + + sparkConf.get(STAGING_DIR).map(new Path(_)) + .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala new file mode 100644 index 0000000..16d8fc3 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn.security + +import java.lang.reflect.UndeclaredThrowableException +import java.security.PrivilegedExceptionAction + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[security] class HiveCredentialProvider extends ServiceCredentialProvider with Logging { + + override def serviceName: String = "hive" + + private def hiveConf(hadoopConf: Configuration): Configuration = { + try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down + // to a Configuration and used without reflection + val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") + // using the (Configuration, Class) constructor allows the current configuration to be + // included in the hive config. + val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration], + classOf[Object].getClass) + ctor.newInstance(hadoopConf, hiveConfClass).asInstanceOf[Configuration] + } catch { + case NonFatal(e) => + logDebug("Fail to create Hive Configuration", e) + hadoopConf + } + } + + override def credentialsRequired(hadoopConf: Configuration): Boolean = { + UserGroupInformation.isSecurityEnabled && + hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty + } + + override def obtainCredentials( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { + val conf = hiveConf(hadoopConf) + + val principalKey = "hive.metastore.kerberos.principal" + val principal = conf.getTrimmed(principalKey, "") + require(principal.nonEmpty, s"Hive principal $principalKey undefined") + val metastoreUri = conf.getTrimmed("hive.metastore.uris", "") + require(metastoreUri.nonEmpty, "Hive metastore uri undefined") + + val currentUser = UserGroupInformation.getCurrentUser() + logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " + + s"$principal at $metastoreUri") + + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive") + val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") + val closeCurrent = hiveClass.getMethod("closeCurrent") + + try { + // get all the instance methods before invoking any + val getDelegationToken = hiveClass.getMethod("getDelegationToken", + classOf[String], classOf[String]) + val getHive = hiveClass.getMethod("get", hiveConfClass) + + doAsRealUser { + val hive = getHive.invoke(null, conf) + val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal) + .asInstanceOf[String] + val hive2Token = new Token[DelegationTokenIdentifier]() + hive2Token.decodeFromUrlString(tokenStr) + logInfo(s"Get Token from hive metastore: ${hive2Token.toString}") + creds.addToken(new Text("hive.server2.delegation.token"), hive2Token) + } + } catch { + case NonFatal(e) => + logDebug(s"Fail to get token from service $serviceName", e) + } finally { + Utils.tryLogNonFatalError { + closeCurrent.invoke(null) + } + } + + None + } + + /** + * Run some code as the real logged in user (which may differ from the current user, for + * example, when using proxying). + */ + private def doAsRealUser[T](fn: => T): T = { + val currentUser = UserGroupInformation.getCurrentUser() + val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) + + // For some reason the Scala-generated anonymous class ends up causing an + // UndeclaredThrowableException, even if you annotate the method with @throws. + try { + realUser.doAs(new PrivilegedExceptionAction[T]() { + override def run(): T = fn + }) + } catch { + case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala new file mode 100644 index 0000000..4e3fcce --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn.security + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.{Credentials, UserGroupInformation} + +import org.apache.spark.SparkConf + +/** + * A credential provider for a service. User must implement this if they need to access a + * secure service from Spark. + */ +trait ServiceCredentialProvider { + + /** + * Name of the service to provide credentials. This name should unique, Spark internally will + * use this name to differentiate credential provider. + */ + def serviceName: String + + /** + * To decide whether credential is required for this service. By default it based on whether + * Hadoop security is enabled. + */ + def credentialsRequired(hadoopConf: Configuration): Boolean = { + UserGroupInformation.isSecurityEnabled + } + + /** + * Obtain credentials for this service and get the time of the next renewal. + * @param hadoopConf Configuration of current Hadoop Compatible system. + * @param sparkConf Spark configuration. + * @param creds Credentials to add tokens and security keys to. + * @return If this Credential is renewable and can be renewed, return the time of the next + * renewal, otherwise None should be returned. + */ + def obtainCredentials( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala new file mode 100644 index 0000000..6c3556a --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.Properties + +/** + * Exposes methods from the launcher library that are used by the YARN backend. + */ +private[spark] object YarnCommandBuilderUtils { + + def quoteForBatchScript(arg: String): String = { + CommandBuilderUtils.quoteForBatchScript(arg) + } + + def findJarsDir(sparkHome: String): String = { + val scalaVer = Properties.versionNumberString + .split("\\.") + .take(2) + .mkString(".") + CommandBuilderUtils.findJarsDir(sparkHome, scalaVer, true) + } + + /** + * Adds the perm gen configuration to the list of java options if needed and not yet added. + * + * Note that this method adds the option based on the local JVM version; if the node where + * the container is running has a different Java version, there's a risk that the option will + * not be added (e.g. if the AM is running Java 8 but the container's node is set up to use + * Java 7). + */ + def addPermGenSizeOpt(args: ListBuffer[String]): Unit = { + CommandBuilderUtils.addPermGenSizeOpt(args.asJava) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala new file mode 100644 index 0000000..4ed2852 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} + +import org.apache.spark.SparkContext +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * An extension service that can be loaded into a Spark YARN scheduler. + * A Service that can be started and stopped. + * + * 1. For implementations to be loadable by `SchedulerExtensionServices`, + * they must provide an empty constructor. + * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was + * never invoked. + */ +trait SchedulerExtensionService { + + /** + * Start the extension service. This should be a no-op if + * called more than once. + * @param binding binding to the spark application and YARN + */ + def start(binding: SchedulerExtensionServiceBinding): Unit + + /** + * Stop the service + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was + * never invoked. + */ + def stop(): Unit +} + +/** + * Binding information for a [[SchedulerExtensionService]]. + * + * The attempt ID will be set if the service is started within a YARN application master; + * there is then a different attempt ID for every time that AM is restarted. + * When the service binding is instantiated in client mode, there's no attempt ID, as it lacks + * this information. + * @param sparkContext current spark context + * @param applicationId YARN application ID + * @param attemptId YARN attemptID. This will always be unset in client mode, and always set in + * cluster mode. + */ +case class SchedulerExtensionServiceBinding( + sparkContext: SparkContext, + applicationId: ApplicationId, + attemptId: Option[ApplicationAttemptId] = None) + +/** + * Container for [[SchedulerExtensionService]] instances. + * + * Loads Extension Services from the configuration property + * `"spark.yarn.services"`, instantiates and starts them. + * When stopped, it stops all child entries. + * + * The order in which child extension services are started and stopped + * is undefined. + */ +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService + with Logging { + private var serviceOption: Option[String] = None + private var services: List[SchedulerExtensionService] = Nil + private val started = new AtomicBoolean(false) + private var binding: SchedulerExtensionServiceBinding = _ + + /** + * Binding operation will load the named services and call bind on them too; the + * entire set of services are then ready for `init()` and `start()` calls. + * + * @param binding binding to the spark application and YARN + */ + def start(binding: SchedulerExtensionServiceBinding): Unit = { + if (started.getAndSet(true)) { + logWarning("Ignoring re-entrant start operation") + return + } + require(binding.sparkContext != null, "Null context parameter") + require(binding.applicationId != null, "Null appId parameter") + this.binding = binding + val sparkContext = binding.sparkContext + val appId = binding.applicationId + val attemptId = binding.attemptId + logInfo(s"Starting Yarn extension services with app $appId and attemptId $attemptId") + + services = sparkContext.conf.get(SCHEDULER_SERVICES).map { sClass => + val instance = Utils.classForName(sClass) + .newInstance() + .asInstanceOf[SchedulerExtensionService] + // bind this service + instance.start(binding) + logInfo(s"Service $sClass started") + instance + }.toList + } + + /** + * Get the list of services. + * + * @return a list of services; Nil until the service is started + */ + def getServices: List[SchedulerExtensionService] = services + + /** + * Stop the services; idempotent. + * + */ + override def stop(): Unit = { + if (started.getAndSet(false)) { + logInfo(s"Stopping $this") + services.foreach { s => + Utils.tryLogNonFatalError(s.stop()) + } + } + } + + override def toString(): String = s"""SchedulerExtensionServices + |(serviceOption=$serviceOption, + | services=$services, + | started=$started)""".stripMargin +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala new file mode 100644 index 0000000..60da356 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.yarn.api.records.YarnApplicationState + +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil} +import org.apache.spark.internal.Logging +import org.apache.spark.launcher.SparkAppHandle +import org.apache.spark.scheduler.TaskSchedulerImpl + +private[spark] class YarnClientSchedulerBackend( + scheduler: TaskSchedulerImpl, + sc: SparkContext) + extends YarnSchedulerBackend(scheduler, sc) + with Logging { + + private var client: Client = null + private var monitorThread: MonitorThread = null + + /** + * Create a Yarn client to submit an application to the ResourceManager. + * This waits until the application is running. + */ + override def start() { + val driverHost = conf.get("spark.driver.host") + val driverPort = conf.get("spark.driver.port") + val hostport = driverHost + ":" + driverPort + sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) } + + val argsArrayBuf = new ArrayBuffer[String]() + argsArrayBuf += ("--arg", hostport) + + logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) + val args = new ClientArguments(argsArrayBuf.toArray) + totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf) + client = new Client(args, conf) + bindToYarn(client.submitApplication(), None) + + // SPARK-8687: Ensure all necessary properties have already been set before + // we initialize our driver scheduler backend, which serves these properties + // to the executors + super.start() + waitForApplication() + + // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver + // reads the credentials from HDFS, just like the executors and updates its own credentials + // cache. + if (conf.contains("spark.yarn.credentials.file")) { + YarnSparkHadoopUtil.get.startCredentialUpdater(conf) + } + monitorThread = asyncMonitorApplication() + monitorThread.start() + } + + /** + * Report the state of the application until it is running. + * If the application has finished, failed or been killed in the process, throw an exception. + * This assumes both `client` and `appId` have already been set. + */ + private def waitForApplication(): Unit = { + assert(client != null && appId.isDefined, "Application has not been submitted yet!") + val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true) // blocking + if (state == YarnApplicationState.FINISHED || + state == YarnApplicationState.FAILED || + state == YarnApplicationState.KILLED) { + throw new SparkException("Yarn application has already ended! " + + "It might have been killed or unable to launch application master.") + } + if (state == YarnApplicationState.RUNNING) { + logInfo(s"Application ${appId.get} has started running.") + } + } + + /** + * We create this class for SPARK-9519. Basically when we interrupt the monitor thread it's + * because the SparkContext is being shut down(sc.stop() called by user code), but if + * monitorApplication return, it means the Yarn application finished before sc.stop() was called, + * which means we should call sc.stop() here, and we don't allow the monitor to be interrupted + * before SparkContext stops successfully. + */ + private class MonitorThread extends Thread { + private var allowInterrupt = true + + override def run() { + try { + val (state, _) = client.monitorApplication(appId.get, logApplicationReport = false) + logError(s"Yarn application has already exited with state $state!") + allowInterrupt = false + sc.stop() + } catch { + case e: InterruptedException => logInfo("Interrupting monitor thread") + } + } + + def stopMonitor(): Unit = { + if (allowInterrupt) { + this.interrupt() + } + } + } + + /** + * Monitor the application state in a separate thread. + * If the application has exited for any reason, stop the SparkContext. + * This assumes both `client` and `appId` have already been set. + */ + private def asyncMonitorApplication(): MonitorThread = { + assert(client != null && appId.isDefined, "Application has not been submitted yet!") + val t = new MonitorThread + t.setName("Yarn application state monitor") + t.setDaemon(true) + t + } + + /** + * Stop the scheduler. This assumes `start()` has already been called. + */ + override def stop() { + assert(client != null, "Attempted to stop this scheduler before starting it!") + if (monitorThread != null) { + monitorThread.stopMonitor() + } + + // Report a final state to the launcher if one is connected. This is needed since in client + // mode this backend doesn't let the app monitor loop run to completion, so it does not report + // the final state itself. + // + // Note: there's not enough information at this point to provide a better final state, + // so assume the application was successful. + client.reportLauncherState(SparkAppHandle.State.FINISHED) + + super.stop() + YarnSparkHadoopUtil.get.stopCredentialUpdater() + client.stop() + logInfo("Stopped") + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala new file mode 100644 index 0000000..64cd1bd --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} + +/** + * Cluster Manager for creation of Yarn scheduler and backend + */ +private[spark] class YarnClusterManager extends ExternalClusterManager { + + override def canCreate(masterURL: String): Boolean = { + masterURL == "yarn" + } + + override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { + sc.deployMode match { + case "cluster" => new YarnClusterScheduler(sc) + case "client" => new YarnScheduler(sc) + case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") + } + } + + override def createSchedulerBackend(sc: SparkContext, + masterURL: String, + scheduler: TaskScheduler): SchedulerBackend = { + sc.deployMode match { + case "cluster" => + new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) + case "client" => + new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) + case _ => + throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") + } + } + + override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { + scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala new file mode 100644 index 0000000..96c9151 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark._ +import org.apache.spark.deploy.yarn.ApplicationMaster + +/** + * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of + * ApplicationMaster, etc is done + */ +private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) { + + logInfo("Created YarnClusterScheduler") + + override def postStartHook() { + ApplicationMaster.sparkContextInitialized(sc) + super.postStartHook() + logInfo("YarnClusterScheduler.postStartHook done") + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala new file mode 100644 index 0000000..4f3d5eb --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.SparkContext +import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil} +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.Utils + +private[spark] class YarnClusterSchedulerBackend( + scheduler: TaskSchedulerImpl, + sc: SparkContext) + extends YarnSchedulerBackend(scheduler, sc) { + + override def start() { + val attemptId = ApplicationMaster.getAttemptId + bindToYarn(attemptId.getApplicationId(), Some(attemptId)) + super.start() + totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf) + } + + override def getDriverLogUrls: Option[Map[String, String]] = { + var driverLogs: Option[Map[String, String]] = None + try { + val yarnConf = new YarnConfiguration(sc.hadoopConfiguration) + val containerId = YarnSparkHadoopUtil.get.getContainerId + + val httpAddress = System.getenv(Environment.NM_HOST.name()) + + ":" + System.getenv(Environment.NM_HTTP_PORT.name()) + // lookup appropriate http scheme for container log urls + val yarnHttpPolicy = yarnConf.get( + YarnConfiguration.YARN_HTTP_POLICY_KEY, + YarnConfiguration.YARN_HTTP_POLICY_DEFAULT + ) + val user = Utils.getCurrentUserName() + val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://" + val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user" + logDebug(s"Base URL for logs: $baseUrl") + driverLogs = Some(Map( + "stdout" -> s"$baseUrl/stdout?start=-4096", + "stderr" -> s"$baseUrl/stderr?start=-4096")) + } catch { + case e: Exception => + logInfo("Error while building AM log links, so AM" + + " logs link will not appear in application UI", e) + } + driverLogs + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala new file mode 100644 index 0000000..0293821 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.hadoop.yarn.util.RackResolver +import org.apache.log4j.{Level, Logger} + +import org.apache.spark._ +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.Utils + +private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { + + // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. + if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { + Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) + } + + // By default, rack is unknown + override def getRackForHost(hostPort: String): Option[String] = { + val host = Utils.parseHostPort(hostPort)._1 + Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala new file mode 100644 index 0000000..2f9ea19 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} +import scala.util.control.NonFatal + +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} + +import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.rpc._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.ui.JettyUtils +import org.apache.spark.util.{RpcUtils, ThreadUtils} + +/** + * Abstract Yarn scheduler backend that contains common logic + * between the client and cluster Yarn scheduler backends. + */ +private[spark] abstract class YarnSchedulerBackend( + scheduler: TaskSchedulerImpl, + sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { + + override val minRegisteredRatio = + if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 + } else { + super.minRegisteredRatio + } + + protected var totalExpectedExecutors = 0 + + private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv) + + private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint( + YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint) + + private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf) + + /** Application ID. */ + protected var appId: Option[ApplicationId] = None + + /** Attempt ID. This is unset for client-mode schedulers */ + private var attemptId: Option[ApplicationAttemptId] = None + + /** Scheduler extension services. */ + private val services: SchedulerExtensionServices = new SchedulerExtensionServices() + + // Flag to specify whether this schedulerBackend should be reset. + private var shouldResetOnAmRegister = false + + /** + * Bind to YARN. This *must* be done before calling [[start()]]. + * + * @param appId YARN application ID + * @param attemptId Optional YARN attempt ID + */ + protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = { + this.appId = Some(appId) + this.attemptId = attemptId + } + + override def start() { + require(appId.isDefined, "application ID unset") + val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId) + services.start(binding) + super.start() + } + + override def stop(): Unit = { + try { + // SPARK-12009: To prevent Yarn allocator from requesting backup for the executors which + // was Stopped by SchedulerBackend. + requestTotalExecutors(0, 0, Map.empty) + super.stop() + } finally { + services.stop() + } + } + + /** + * Get the attempt ID for this run, if the cluster manager supports multiple + * attempts. Applications run in client mode will not have attempt IDs. + * This attempt ID only includes attempt counter, like "1", "2". + * + * @return The application attempt id, if available. + */ + override def applicationAttemptId(): Option[String] = { + attemptId.map(_.getAttemptId.toString) + } + + /** + * Get an application ID associated with the job. + * This returns the string value of [[appId]] if set, otherwise + * the locally-generated ID from the superclass. + * @return The application ID + */ + override def applicationId(): String = { + appId.map(_.toString).getOrElse { + logWarning("Application ID is not initialized yet.") + super.applicationId + } + } + + /** + * Request executors from the ApplicationMaster by specifying the total number desired. + * This includes executors already pending or running. + */ + override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { + yarnSchedulerEndpointRef.ask[Boolean]( + RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)) + } + + /** + * Request that the ApplicationMaster kill the specified executors. + */ + override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { + yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds)) + } + + override def sufficientResourcesRegistered(): Boolean = { + totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio + } + + /** + * Add filters to the SparkUI. + */ + private def addWebUIFilter( + filterName: String, + filterParams: Map[String, String], + proxyBase: String): Unit = { + if (proxyBase != null && proxyBase.nonEmpty) { + System.setProperty("spark.ui.proxyBase", proxyBase) + } + + val hasFilter = + filterName != null && filterName.nonEmpty && + filterParams != null && filterParams.nonEmpty + if (hasFilter) { + logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase") + conf.set("spark.ui.filters", filterName) + filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) } + scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) } + } + } + + override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { + new YarnDriverEndpoint(rpcEnv, properties) + } + + /** + * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed + * and re-registered itself to driver after a failure. The stale state in driver should be + * cleaned. + */ + override protected def reset(): Unit = { + super.reset() + sc.executorAllocationManager.foreach(_.reset()) + } + + /** + * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected. + * This endpoint communicates with the executors and queries the AM for an executor's exit + * status when the executor is disconnected. + */ + private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) + extends DriverEndpoint(rpcEnv, sparkProperties) { + + /** + * When onDisconnected is received at the driver endpoint, the superclass DriverEndpoint + * handles it by assuming the Executor was lost for a bad reason and removes the executor + * immediately. + * + * In YARN's case however it is crucial to talk to the application master and ask why the + * executor had exited. If the executor exited for some reason unrelated to the running tasks + * (e.g., preemption), according to the application master, then we pass that information down + * to the TaskSetManager to inform the TaskSetManager that tasks on that lost executor should + * not count towards a job failure. + */ + override def onDisconnected(rpcAddress: RpcAddress): Unit = { + addressToExecutorId.get(rpcAddress).foreach { executorId => + if (disableExecutor(executorId)) { + yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress) + } + } + } + } + + /** + * An [[RpcEndpoint]] that communicates with the ApplicationMaster. + */ + private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv) + extends ThreadSafeRpcEndpoint with Logging { + private var amEndpoint: Option[RpcEndpointRef] = None + + private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver( + executorId: String, + executorRpcAddress: RpcAddress): Unit = { + val removeExecutorMessage = amEndpoint match { + case Some(am) => + val lossReasonRequest = GetExecutorLossReason(executorId) + am.ask[ExecutorLossReason](lossReasonRequest, askTimeout) + .map { reason => RemoveExecutor(executorId, reason) }(ThreadUtils.sameThread) + .recover { + case NonFatal(e) => + logWarning(s"Attempted to get executor loss reason" + + s" for executor id ${executorId} at RPC address ${executorRpcAddress}," + + s" but got no response. Marking as slave lost.", e) + RemoveExecutor(executorId, SlaveLost()) + }(ThreadUtils.sameThread) + case None => + logWarning("Attempted to check for an executor loss reason" + + " before the AM has registered!") + Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered."))) + } + + removeExecutorMessage + .flatMap { message => + driverEndpoint.ask[Boolean](message) + }(ThreadUtils.sameThread) + .onFailure { + case NonFatal(e) => logError( + s"Error requesting driver to remove executor $executorId after disconnection.", e) + }(ThreadUtils.sameThread) + } + + override def receive: PartialFunction[Any, Unit] = { + case RegisterClusterManager(am) => + logInfo(s"ApplicationMaster registered as $am") + amEndpoint = Option(am) + if (!shouldResetOnAmRegister) { + shouldResetOnAmRegister = true + } else { + // AM is already registered before, this potentially means that AM failed and + // a new one registered after the failure. This will only happen in yarn-client mode. + reset() + } + + case AddWebUIFilter(filterName, filterParams, proxyBase) => + addWebUIFilter(filterName, filterParams, proxyBase) + + case r @ RemoveExecutor(executorId, reason) => + logWarning(reason.toString) + driverEndpoint.ask[Boolean](r).onFailure { + case e => + logError("Error requesting driver to remove executor" + + s" $executorId for reason $reason", e) + }(ThreadUtils.sameThread) + } + + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case r: RequestExecutors => + amEndpoint match { + case Some(am) => + am.ask[Boolean](r).andThen { + case Success(b) => context.reply(b) + case Failure(NonFatal(e)) => + logError(s"Sending $r to AM was unsuccessful", e) + context.sendFailure(e) + }(ThreadUtils.sameThread) + case None => + logWarning("Attempted to request executors before the AM has registered!") + context.reply(false) + } + + case k: KillExecutors => + amEndpoint match { + case Some(am) => + am.ask[Boolean](k).andThen { + case Success(b) => context.reply(b) + case Failure(NonFatal(e)) => + logError(s"Sending $k to AM was unsuccessful", e) + context.sendFailure(e) + }(ThreadUtils.sameThread) + case None => + logWarning("Attempted to kill executors before the AM has registered!") + context.reply(false) + } + + case RetrieveLastAllocatedExecutorId => + context.reply(currentExecutorIdCounter) + } + + override def onDisconnected(remoteAddress: RpcAddress): Unit = { + if (amEndpoint.exists(_.address == remoteAddress)) { + logWarning(s"ApplicationMaster has disassociated: $remoteAddress") + amEndpoint = None + } + } + } +} + +private[spark] object YarnSchedulerBackend { + val ENDPOINT_NAME = "YarnScheduler" +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider new file mode 100644 index 0000000..d0ef5ef --- /dev/null +++ b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider @@ -0,0 +1 @@ +org.apache.spark.deploy.yarn.security.TestCredentialProvider http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/test/resources/log4j.properties b/resource-managers/yarn/src/test/resources/log4j.properties new file mode 100644 index 0000000..d13454d --- /dev/null +++ b/resource-managers/yarn/src/test/resources/log4j.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=DEBUG, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from a few verbose libraries. +log4j.logger.com.sun.jersey=WARN +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.mortbay=WARN +log4j.logger.org.spark_project.jetty=WARN --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
