This is an automated email from the ASF dual-hosted git repository. jiangxb1987 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 148262f [SPARK-30969][CORE] Remove resource coordination support from Standalone 148262f is described below commit 148262f3fbdf7c5da7cd147cf43bf5ebab5f5244 Author: yi.wu <yi...@databricks.com> AuthorDate: Mon Mar 2 11:23:07 2020 -0800 [SPARK-30969][CORE] Remove resource coordination support from Standalone ### What changes were proposed in this pull request? Remove automatically resource coordination support from Standalone. ### Why are the changes needed? Resource coordination is mainly designed for the scenario where multiple workers launched on the same host. However, it's, actually, a non-existed scenario for today's Spark. Because, Spark now can start multiple executors in a single Worker, while it only allow one executor per Worker at very beginning. So, now, it really help nothing for user to launch multiple workers on the same host. Thus, it's not worth for us to bring over complicated implementation and potential high maintain [...] ### Does this PR introduce any user-facing change? No, it's Spark 3.0 feature. ### How was this patch tested? Pass Jenkins. Closes #27722 from Ngone51/abandon_coordination. Authored-by: yi.wu <yi...@databricks.com> Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com> (cherry picked from commit b517f991fe0c95a186872d38be6a2091d9326195) Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com> --- .gitignore | 1 - .../main/scala/org/apache/spark/SparkContext.scala | 25 +- .../spark/deploy/StandaloneResourceUtils.scala | 263 +-------------------- .../org/apache/spark/deploy/worker/Worker.scala | 27 +-- .../org/apache/spark/internal/config/package.scala | 17 -- .../main/scala/org/apache/spark/util/Utils.scala | 22 -- .../scala/org/apache/spark/SparkContextSuite.scala | 6 +- .../apache/spark/deploy/worker/WorkerSuite.scala | 74 +----- .../resource/ResourceDiscoveryPluginSuite.scala | 4 - docs/configuration.md | 27 +-- docs/spark-standalone.md | 10 +- 11 files changed, 17 insertions(+), 459 deletions(-) diff --git a/.gitignore b/.gitignore index 798e8ac..198fdee 100644 --- a/.gitignore +++ b/.gitignore @@ -72,7 +72,6 @@ scalastyle-on-compile.generated.xml scalastyle-output.xml scalastyle.txt spark-*-bin-*.tgz -spark-resources/ spark-tests.log src_managed/ streaming-tests.log diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 91188d5..bcbb7e4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -41,7 +41,6 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} -import org.apache.spark.deploy.StandaloneResourceUtils._ import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource} import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} import org.apache.spark.internal.Logging @@ -250,15 +249,6 @@ class SparkContext(config: SparkConf) extends Logging { def isLocal: Boolean = Utils.isLocalMaster(_conf) - private def isClientStandalone: Boolean = { - val isSparkCluster = master match { - case SparkMasterRegex.SPARK_REGEX(_) => true - case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, _) => true - case _ => false - } - deployMode == "client" && isSparkCluster - } - /** * @return true if context is stopped or in the midst of stopping. */ @@ -396,17 +386,7 @@ class SparkContext(config: SparkConf) extends Logging { _driverLogger = DriverLogger(_conf) val resourcesFileOpt = conf.get(DRIVER_RESOURCES_FILE) - val allResources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt) - _resources = { - // driver submitted in client mode under Standalone may have conflicting resources with - // other drivers/workers on this host. We should sync driver's resources info into - // SPARK_RESOURCES/SPARK_RESOURCES_COORDINATE_DIR/ to avoid collision. - if (isClientStandalone) { - acquireResources(_conf, SPARK_DRIVER_PREFIX, allResources, Utils.getProcessId) - } else { - allResources - } - } + _resources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt) logResourceInfo(SPARK_DRIVER_PREFIX, _resources) // log out spark.app.name in the Spark driver logs @@ -2019,9 +1999,6 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _progressBar.foreach(_.stop()) } - if (isClientStandalone) { - releaseResources(_conf, SPARK_DRIVER_PREFIX, _resources, Utils.getProcessId) - } _taskScheduler = null // TODO: Cache.stop()? if (_env != null) { diff --git a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala index 65bf435..e08709e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala @@ -17,30 +17,21 @@ package org.apache.spark.deploy -import java.io.{File, RandomAccessFile} -import java.nio.channels.{FileLock, OverlappingFileLockException} +import java.io.File import java.nio.file.Files import scala.collection.mutable -import scala.util.Random import scala.util.control.NonFatal import org.json4s.{DefaultFormats, Extraction} -import org.json4s.jackson.JsonMethods.{compact, parse, render} +import org.json4s.jackson.JsonMethods.{compact, render} -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.SparkException import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{SPARK_RESOURCES_COORDINATE, SPARK_RESOURCES_DIR} import org.apache.spark.resource.{ResourceAllocation, ResourceID, ResourceInformation, ResourceRequirement} -import org.apache.spark.resource.ResourceUtils.{parseResourceRequirements, withResourcesJson} import org.apache.spark.util.Utils private[spark] object StandaloneResourceUtils extends Logging { - // These directory/files are used to coordinate the resources between - // the drivers/workers on the host in Spark Standalone. - val SPARK_RESOURCES_COORDINATE_DIR = "spark-resources" - val ALLOCATED_RESOURCES_FILE = "__allocated_resources__.json" - val RESOURCES_LOCK_FILE = "__allocated_resources__.lock" /** * A mutable resource information which provides more efficient modification on addresses. @@ -87,249 +78,6 @@ private[spark] object StandaloneResourceUtils extends Logging { } /** - * Assigns (if coordinate needed) resources to workers/drivers from the same host to avoid - * address conflict. - * - * This function works in three steps. First, acquiring the lock on RESOURCES_LOCK_FILE - * to achieve synchronization among workers and drivers. Second, getting all allocated - * resources from ALLOCATED_RESOURCES_FILE and assigning isolated resources to the worker - * or driver after differentiating available resources in discovered resources from - * allocated resources. If available resources don't meet worker's or driver's requirement, - * try to update allocated resources by excluding the resource allocation if its related - * process has already terminated and do the assignment again. If still don't meet requirement, - * exception should be thrown. Third, updating ALLOCATED_RESOURCES_FILE with new allocated - * resources along with pid for the worker or driver. Then, return allocated resources - * information after releasing the lock. - * - * @param conf SparkConf - * @param componentName spark.driver / spark.worker - * @param resources the resources found by worker/driver on the host - * @param pid the process id of worker/driver to acquire resources. - * @return allocated resources for the worker/driver or throws exception if can't - * meet worker/driver's requirement - */ - def acquireResources( - conf: SparkConf, - componentName: String, - resources: Map[String, ResourceInformation], - pid: Int) - : Map[String, ResourceInformation] = { - if (!needCoordinate(conf)) { - return resources - } - val resourceRequirements = parseResourceRequirements(conf, componentName) - if (resourceRequirements.isEmpty) { - return Map.empty - } - val lock = acquireLock(conf) - try { - val resourcesFile = new File(getOrCreateResourcesDir(conf), ALLOCATED_RESOURCES_FILE) - // all allocated resources in ALLOCATED_RESOURCES_FILE, can be updated if any allocations' - // related processes detected to be terminated while checking pids below. - var origAllocation = Seq.empty[StandaloneResourceAllocation] - // Map[pid -> Map[resourceName -> Addresses[]]] - var allocated = { - if (resourcesFile.exists()) { - origAllocation = allocatedStandaloneResources(resourcesFile.getPath) - val allocations = origAllocation.map { resource => - val resourceMap = { - resource.allocations.map { allocation => - allocation.id.resourceName -> allocation.addresses.toArray - }.toMap - } - resource.pid -> resourceMap - }.toMap - allocations - } else { - Map.empty[Int, Map[String, Array[String]]] - } - } - - // new allocated resources for worker or driver, - // map from resource name to its allocated addresses. - var newAssignments: Map[String, Array[String]] = null - // Whether we've checked process status and we'll only do the check at most once. - // Do the check iff the available resources can't meet the requirements at the first time. - var checked = false - // Whether we need to keep allocating for the worker/driver and we'll only go through - // the loop at most twice. - var keepAllocating = true - while (keepAllocating) { - keepAllocating = false - // store the pid whose related allocated resources conflict with - // discovered resources passed in. - val pidsToCheck = mutable.Set[Int]() - newAssignments = resourceRequirements.map { req => - val rName = req.resourceName - val amount = req.amount - // initially, we must have available.length >= amount as we've done pre-check previously - var available = resources(rName).addresses - // gets available resource addresses by excluding all - // allocated resource addresses from discovered resources - allocated.foreach { a => - val thePid = a._1 - val resourceMap = a._2 - val assigned = resourceMap.getOrElse(rName, Array.empty) - val retained = available.diff(assigned) - // if len(retained) < len(available) after differ to assigned, then, there must be - // some conflicting resources addresses between available and assigned. So, we should - // store its pid here to check whether it's alive in case we don't find enough - // resources after traversal all allocated resources. - if (retained.length < available.length && !checked) { - pidsToCheck += thePid - } - if (retained.length >= amount) { - available = retained - } else if (checked) { - keepAllocating = false - throw new SparkException(s"No more resources available since they've already" + - s" assigned to other workers/drivers.") - } else { - keepAllocating = true - } - } - val assigned = { - if (keepAllocating) { // can't meet the requirement - // excludes the allocation whose related process has already been terminated. - val (invalid, valid) = allocated.partition { a => - pidsToCheck(a._1) && !(Utils.isTesting || Utils.isProcessRunning(a._1))} - allocated = valid - origAllocation = origAllocation.filter( - allocation => !invalid.contains(allocation.pid)) - checked = true - // note this is a meaningless return value, just to avoid creating any new object - available - } else { - available.take(amount) - } - } - rName -> assigned - }.toMap - } - val newAllocation = { - val allocations = newAssignments.map { case (rName, addresses) => - ResourceAllocation(new ResourceID(componentName, rName), addresses) - }.toSeq - StandaloneResourceAllocation(pid, allocations) - } - writeResourceAllocationJson( - componentName, origAllocation ++ Seq(newAllocation), resourcesFile) - newAllocation.toResourceInformationMap - } finally { - releaseLock(lock) - } - } - - /** - * Frees (if coordinate needed) all the resources a worker/driver (pid) has in one shot - * to make those resources be available for other workers/drivers on the same host. - * @param conf SparkConf - * @param componentName spark.driver / spark.worker - * @param toRelease the resources expected to release - * @param pid the process id of worker/driver to release resources. - */ - def releaseResources( - conf: SparkConf, - componentName: String, - toRelease: Map[String, ResourceInformation], - pid: Int) - : Unit = { - if (!needCoordinate(conf)) { - return - } - if (toRelease != null && toRelease.nonEmpty) { - val lock = acquireLock(conf) - try { - val resourcesFile = new File(getOrCreateResourcesDir(conf), ALLOCATED_RESOURCES_FILE) - if (resourcesFile.exists()) { - val (target, others) = - allocatedStandaloneResources(resourcesFile.getPath).partition(_.pid == pid) - if (target.nonEmpty) { - if (others.isEmpty) { - if (!resourcesFile.delete()) { - logError(s"Failed to delete $ALLOCATED_RESOURCES_FILE.") - } - } else { - writeResourceAllocationJson(componentName, others, resourcesFile) - } - logDebug(s"$componentName(pid=$pid) released resources: ${toRelease.mkString("\n")}") - } else { - logWarning(s"$componentName(pid=$pid) has already released its resources.") - } - } - } finally { - releaseLock(lock) - } - } - } - - private def acquireLock(conf: SparkConf): FileLock = { - val resourcesDir = getOrCreateResourcesDir(conf) - val lockFile = new File(resourcesDir, RESOURCES_LOCK_FILE) - val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel - var keepTry = true - var lock: FileLock = null - while (keepTry) { - try { - lock = lockFileChannel.lock() - logInfo(s"Acquired lock on $RESOURCES_LOCK_FILE.") - keepTry = false - } catch { - case e: OverlappingFileLockException => - // This exception throws when we're in LocalSparkCluster mode. FileLock is designed - // to be used across JVMs, but our LocalSparkCluster is designed to launch multiple - // workers in the same JVM. As a result, when an worker in LocalSparkCluster try to - // acquire the lock on `resources.lock` which already locked by other worker, we'll - // hit this exception. So, we should manually control it. - keepTry = true - // there may be multiple workers race for the lock, - // so, sleep for a random time to avoid possible conflict - val duration = Random.nextInt(1000) + 1000 - Thread.sleep(duration) - } - } - assert(lock != null, s"Acquired null lock on $RESOURCES_LOCK_FILE.") - lock - } - - private def releaseLock(lock: FileLock): Unit = { - try { - lock.release() - lock.channel().close() - logInfo(s"Released lock on $RESOURCES_LOCK_FILE.") - } catch { - case e: Exception => - logError(s"Error while releasing lock on $RESOURCES_LOCK_FILE.", e) - } - } - - private def getOrCreateResourcesDir(conf: SparkConf): File = { - val coordinateDir = new File(conf.get(SPARK_RESOURCES_DIR).getOrElse { - val sparkHome = if (Utils.isTesting) { - assert(sys.props.contains("spark.test.home") || - sys.env.contains("SPARK_HOME"), "spark.test.home or SPARK_HOME is not set.") - sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME")) - } else { - sys.env.getOrElse("SPARK_HOME", ".") - } - sparkHome - }) - val resourceDir = new File(coordinateDir, SPARK_RESOURCES_COORDINATE_DIR) - if (!resourceDir.exists()) { - Utils.createDirectory(resourceDir) - } - resourceDir - } - - private def allocatedStandaloneResources(resourcesFile: String) - : Seq[StandaloneResourceAllocation] = { - withResourcesJson[StandaloneResourceAllocation](resourcesFile) { json => - implicit val formats = DefaultFormats - parse(json).extract[Seq[StandaloneResourceAllocation]] - } - } - - /** * Save the allocated resources of driver(cluster only) or executor into a JSON formatted * resources file. Used in Standalone only. * @param componentName spark.driver / spark.executor @@ -372,11 +120,6 @@ private[spark] object StandaloneResourceUtils extends Logging { Files.write(jsonFile.toPath, compact(render(allocationJson)).getBytes()) } - /** Whether needs to coordinate resources among workers and drivers for user */ - def needCoordinate(conf: SparkConf): Boolean = { - conf.get(SPARK_RESOURCES_COORDINATE) - } - def toMutable(immutableResources: Map[String, ResourceInformation]) : Map[String, MutableResourceInfo] = { immutableResources.map { case (rName, rInfo) => diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 4be495a..73cac880 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -34,7 +34,7 @@ import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.ExternalShuffleService import org.apache.spark.deploy.StandaloneResourceUtils._ -import org.apache.spark.deploy.master.{DriverState, Master, WorkerResourceInfo} +import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.Tests.IS_TESTING @@ -57,8 +57,7 @@ private[deploy] class Worker( val conf: SparkConf, val securityMgr: SecurityManager, resourceFileOpt: Option[String] = None, - externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null, - pid: Int = Utils.getProcessId) + externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null) extends ThreadSafeRpcEndpoint with Logging { private val host = rpcEnv.address.host @@ -205,7 +204,6 @@ private[deploy] class Worker( logInfo("Spark home: " + sparkHome) createWorkDir() startExternalShuffleService() - releaseResourcesOnInterrupt() setupWorkerResources() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() @@ -219,26 +217,13 @@ private[deploy] class Worker( metricsSystem.getServletHandlers.foreach(webUi.attachHandler) } - /** - * Used to catch the TERM signal from sbin/stop-slave.sh and - * release resources before Worker exits - */ - private def releaseResourcesOnInterrupt(): Unit = { - SignalUtils.register("TERM") { - releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) - false - } - } - private def setupWorkerResources(): Unit = { try { - val allResources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, resourceFileOpt) - resources = acquireResources(conf, SPARK_WORKER_PREFIX, allResources, pid) + resources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, resourceFileOpt) logResourceInfo(SPARK_WORKER_PREFIX, resources) } catch { case e: Exception => logError("Failed to setup worker resources: ", e) - releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) if (!Utils.isTesting) { System.exit(1) } @@ -373,7 +358,6 @@ private[deploy] class Worker( TimeUnit.SECONDS)) } } else { - releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) logError("All masters are unresponsive! Giving up.") System.exit(1) } @@ -472,7 +456,6 @@ private[deploy] class Worker( case RegisterWorkerFailed(message) => if (!registered) { logError("Worker registration failed: " + message) - releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) System.exit(1) } @@ -738,7 +721,6 @@ private[deploy] class Worker( } override def onStop(): Unit = { - releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) cleanupThreadExecutor.shutdownNow() metricsSystem.report() cancelLastRegistrationRetry() @@ -875,9 +857,8 @@ private[deploy] object Worker extends Logging { val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL) - val pid = if (Utils.isTesting) workerNumber.get else Utils.getProcessId rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, - masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt, pid = pid)) + masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt)) rpcEnv } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 37ce178..23c31a5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -37,23 +37,6 @@ package object config { private[spark] val SPARK_TASK_PREFIX = "spark.task" private[spark] val LISTENER_BUS_EVENT_QUEUE_PREFIX = "spark.scheduler.listenerbus.eventqueue" - private[spark] val SPARK_RESOURCES_COORDINATE = - ConfigBuilder("spark.resources.coordinateResourcesInStandalone") - .doc("Whether to coordinate resources automatically among workers/drivers(client only) " + - "in Standalone. If false, the user is responsible for configuring different resources " + - "for workers/drivers that run on the same host.") - .booleanConf - .createWithDefault(true) - - private[spark] val SPARK_RESOURCES_DIR = - ConfigBuilder("spark.resources.dir") - .doc("Directory used to coordinate resources among workers/drivers(client only) in " + - "Standalone. Default is SPARK_HOME. Make sure to use the same directory for worker " + - "and drivers in client mode that run on the same host. Don't clean up this directory " + - "while workers/drivers are still alive to avoid the most likely resources conflict. ") - .stringConf - .createOptional - private[spark] val RESOURCES_DISCOVERY_PLUGIN = ConfigBuilder("spark.resources.discoveryPlugin") .doc("Comma-separated list of class names implementing" + diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index dde4323..9f332ba 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2547,28 +2547,6 @@ private[spark] object Utils extends Logging { } /** - * Given a process id, return true if the process is still running. - */ - def isProcessRunning(pid: Int): Boolean = { - val process = executeCommand(Seq("kill", "-0", pid.toString)) - process.waitFor(10, TimeUnit.SECONDS) - process.exitValue() == 0 - } - - /** - * Returns the pid of this JVM process. - */ - def getProcessId: Int = { - val PROCESS = "(\\d+)@(.*)".r - val name = getProcessName() - name match { - case PROCESS(pid, _) => pid.toInt - case _ => - throw new SparkException(s"Unexpected process name: $name, expected to be PID@hostname.") - } - } - - /** * Returns the name of this JVM process. This is OS dependent but typically (OSX, Linux, Windows), * this is formatted as PID@hostname. */ diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index df9c7c5..9f8fa89 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -841,7 +841,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu .setAppName("test-cluster") .set(DRIVER_GPU_ID.amountConf, "3") .set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath) - .set(SPARK_RESOURCES_DIR, dir.getName()) sc = new SparkContext(conf) @@ -924,7 +923,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assume(!(Utils.isWindows)) withTempDir { dir => val discoveryScript = createTempScriptWithExpectedOutput(dir, "resourceDiscoveryScript", - """{"name": "gpu","addresses":["0", "1", "2", "3", "4", "5", "6", "7", "8"]}""") + """{"name": "gpu","addresses":["0", "1", "2"]}""") val conf = new SparkConf() .setMaster("local-cluster[3, 1, 1024]") @@ -933,7 +932,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript) .set(TASK_GPU_ID.amountConf, "3") .set(EXECUTOR_GPU_ID.amountConf, "3") - .set(SPARK_RESOURCES_DIR, dir.getName()) sc = new SparkContext(conf) @@ -945,7 +943,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu context.resources().get(GPU).get.addresses.iterator } val gpus = rdd.collect() - assert(gpus.sorted === Seq("0", "1", "2", "3", "4", "5", "6", "7", "8")) + assert(gpus.sorted === Seq("0", "0", "0", "1", "1", "1", "2", "2", "2")) eventually(timeout(10.seconds)) { assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala index bb541b4..2d3d0af 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala @@ -36,7 +36,6 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.TestUtils.{createTempJsonFile, createTempScriptWithExpectedOutput} import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService} import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged, WorkDirCleanup} -import org.apache.spark.deploy.StandaloneResourceUtils.{ALLOCATED_RESOURCES_FILE, SPARK_RESOURCES_COORDINATE_DIR} import org.apache.spark.deploy.master.DriverState import org.apache.spark.internal.config import org.apache.spark.internal.config.Worker._ @@ -64,7 +63,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { private def makeWorker( conf: SparkConf = new SparkConf(), shuffleServiceSupplier: Supplier[ExternalShuffleService] = null, - pid: Int = Utils.getProcessId, local: Boolean = false): Worker = { assert(_worker === null, "Some Worker's RpcEnv is leaked in tests") val securityMgr = new SecurityManager(conf) @@ -72,7 +70,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { val resourcesFile = conf.get(SPARK_WORKER_RESOURCE_FILE) val localWorker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)), "Worker", "/tmp", - conf, securityMgr, resourcesFile, shuffleServiceSupplier, pid) + conf, securityMgr, resourcesFile, shuffleServiceSupplier) if (local) { localWorker } else { @@ -81,14 +79,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { } } - private def assertResourcesFileDeleted(): Unit = { - assert(sys.props.contains("spark.test.home")) - val sparkHome = sys.props.get("spark.test.home") - val resourceFile = new File(sparkHome + "/" + SPARK_RESOURCES_COORDINATE_DIR, - ALLOCATED_RESOURCES_FILE) - assert(!resourceFile.exists()) - } - before { MockitoAnnotations.initMocks(this) } @@ -251,7 +241,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { worker.rpcEnv.shutdown() worker.rpcEnv.awaitTermination() } - assertResourcesFileDeleted() } test("worker could load resources from resources file while launching") { @@ -273,7 +262,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { worker.rpcEnv.shutdown() worker.rpcEnv.awaitTermination() } - assertResourcesFileDeleted() } } @@ -292,7 +280,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { worker.rpcEnv.shutdown() worker.rpcEnv.awaitTermination() } - assertResourcesFileDeleted() } } @@ -316,65 +303,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { worker.rpcEnv.shutdown() worker.rpcEnv.awaitTermination() } - assertResourcesFileDeleted() - } - } - - test("Workers run on the same host should avoid resources conflict when coordinate is on") { - val conf = new SparkConf() - withTempDir { dir => - val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", - """{"name": "fpga","addresses":["f1", "f2", "f3", "f4", "f5"]}""") - conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath) - conf.set(WORKER_FPGA_ID.amountConf, "2") - val workers = (0 until 3).map(id => makeWorker(conf, pid = id, local = true)) - workers.zipWithIndex.foreach{case (w, i) => w.rpcEnv.setupEndpoint(s"worker$i", w)} - eventually(timeout(20.seconds)) { - val (empty, nonEmpty) = workers.partition(_.resources.isEmpty) - assert(empty.length === 1) - assert(nonEmpty.length === 2) - val totalResources = nonEmpty.flatMap(_.resources(FPGA).addresses).toSet.toSeq.sorted - assert(totalResources === Seq("f1", "f2", "f3", "f4")) - workers.foreach(_.rpcEnv.shutdown()) - workers.foreach(_.rpcEnv.awaitTermination()) - } - assertResourcesFileDeleted() - } - } - - test("Workers run on the same host should load resources naively when coordinate is off") { - val conf = new SparkConf() - // disable coordination - conf.set(config.SPARK_RESOURCES_COORDINATE, false) - withTempDir { dir => - val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("g0", "g1")) - val ja = Extraction.decompose(Seq(gpuArgs)) - val resourcesPath = createTempJsonFile(dir, "resources", ja) - val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", - """{"name": "fpga","addresses":["f1", "f2", "f3", "f4", "f5"]}""") - conf.set(SPARK_WORKER_RESOURCE_FILE.key, resourcesPath) - conf.set(WORKER_GPU_ID.amountConf, "2") - conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath) - conf.set(WORKER_FPGA_ID.amountConf, "2") - val workers = (0 until 3).map(id => makeWorker(conf, pid = id, local = true)) - workers.zipWithIndex.foreach{case (w, i) => w.rpcEnv.setupEndpoint(s"worker$i", w)} - eventually(timeout(20.seconds)) { - val (empty, nonEmpty) = workers.partition(_.resources.isEmpty) - assert(empty.length === 0) - assert(nonEmpty.length === 3) - // Each Worker should get the same resources from resources file and discovery script - // without coordination. Note that, normally, we must config different resources - // for workers run on the same host when coordinate config is off. Test here is used - // to validate the different behaviour comparing to the above test when coordinate config - // is on, so we admit the resources collision here. - nonEmpty.foreach { worker => - assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation, - FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3", "f4", "f5")))) - } - workers.foreach(_.rpcEnv.shutdown()) - workers.foreach(_.rpcEnv.awaitTermination()) - } - assertResourcesFileDeleted() } } diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala index 437c903..ff7d680 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala @@ -50,7 +50,6 @@ class ResourceDiscoveryPluginSuite extends SparkFunSuite with LocalSparkContext .set(WORKER_GPU_ID.amountConf, "2") .set(TASK_GPU_ID.amountConf, "1") .set(EXECUTOR_GPU_ID.amountConf, "1") - .set(SPARK_RESOURCES_DIR, dir.getName()) .set(WORKER_FPGA_ID.amountConf, "2") .set(TASK_FPGA_ID.amountConf, "1") .set(EXECUTOR_FPGA_ID.amountConf, "1") @@ -81,7 +80,6 @@ class ResourceDiscoveryPluginSuite extends SparkFunSuite with LocalSparkContext .set(WORKER_GPU_ID.amountConf, "2") .set(TASK_GPU_ID.amountConf, "1") .set(EXECUTOR_GPU_ID.amountConf, "1") - .set(SPARK_RESOURCES_DIR, dir.getName()) sc = new SparkContext(conf) TestUtils.waitUntilExecutorsUp(sc, 2, 60000) @@ -108,7 +106,6 @@ class ResourceDiscoveryPluginSuite extends SparkFunSuite with LocalSparkContext .set(WORKER_GPU_ID.amountConf, "2") .set(TASK_GPU_ID.amountConf, "1") .set(EXECUTOR_GPU_ID.amountConf, "1") - .set(SPARK_RESOURCES_DIR, dir.getName()) sc = new SparkContext(conf) TestUtils.waitUntilExecutorsUp(sc, 2, 60000) @@ -134,7 +131,6 @@ class ResourceDiscoveryPluginSuite extends SparkFunSuite with LocalSparkContext .set(RESOURCES_DISCOVERY_PLUGIN, Seq(classOf[TestResourceDiscoveryPluginEmpty].getName())) .set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath) .set(DRIVER_GPU_ID.amountConf, "2") - .set(SPARK_RESOURCES_DIR, dir.getName()) sc = new SparkContext(conf) TestUtils.waitUntilExecutorsUp(sc, 2, 60000) diff --git a/docs/configuration.md b/docs/configuration.md index 469feed..5e6fe93 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -194,25 +194,6 @@ of the most common options to set are: </td> </tr> <tr> - <td><code>spark.resources.coordinateResourcesInStandalone</code></td> - <td>true</td> - <td> - Whether to coordinate resources automatically among workers/drivers(client only) - in Standalone. If false, the user is responsible for configuring different resources - for workers/drivers that run on the same host. - </td> -</tr> -<tr> - <td><code>spark.resources.dir</code></td> - <td>SPARK_HOME</td> - <td> - Directory used to coordinate resources among workers/drivers(client only) in Standalone. - Default is SPARK_HOME. Make sure to use the same directory for worker and drivers in - client mode that run on the same host. Don't clean up this directory while workers/drivers - are still alive to avoid the most likely resources conflict. - </td> -</tr> -<tr> <td><code>spark.driver.resource.{resourceName}.amount</code></td> <td>0</td> <td> @@ -228,9 +209,8 @@ of the most common options to set are: <td> A script for the driver to run to discover a particular resource type. This should write to STDOUT a JSON string in the format of the ResourceInformation class. This has a - name and an array of addresses. For a client-submitted driver in Standalone, discovery - script must assign different resource addresses to this driver comparing to workers' and - other drivers' when <code>spark.resources.coordinateResourcesInStandalone</code> is off. + name and an array of addresses. For a client-submitted driver, discovery script must assign + different resource addresses to this driver comparing to other drivers on the same host. </td> </tr> <tr> @@ -2755,5 +2735,4 @@ There are configurations available to request resources for the driver: <code>sp Spark will use the configurations specified to first request containers with the corresponding resources from the cluster manager. Once it gets the container, Spark launches an Executor in that container which will discover what resources the container has and the addresses associated with each resource. The Executor will register with the Driver and report back the resources available to that Executor. The Spark scheduler can then schedule tasks to each Executor and assign specific reso [...] -See your cluster manager specific page for requirements and details on each of - [YARN](running-on-yarn.html#resource-allocation-and-configuration-overview), [Kubernetes](running-on-kubernetes.html#resource-allocation-and-configuration-overview) and [Standalone Mode](spark-standalone.html#resource-allocation-and-configuration-overview). It is currently not available with Mesos or local mode. If using local-cluster mode see the Spark Standalone documentation but be aware only a single wor [...] - +See your cluster manager specific page for requirements and details on each of - [YARN](running-on-yarn.html#resource-allocation-and-configuration-overview), [Kubernetes](running-on-kubernetes.html#resource-allocation-and-configuration-overview) and [Standalone Mode](spark-standalone.html#resource-allocation-and-configuration-overview). It is currently not available with Mesos or local mode. And please also note that local-cluster mode with multiple workers is not supported(see Standalon [...] diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index ff310ed..17b6772 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -256,8 +256,6 @@ SPARK_MASTER_OPTS supports the following system properties: <td> Path to resource discovery script, which is used to find a particular resource while worker starting up. And the output of the script should be formatted like the <code>ResourceInformation</code> class. - When <code>spark.resources.coordinateResourcesInStandalone</code> is off, the discovery script must assign different - resources for workers and drivers in client mode that run on the same host to avoid resource conflict. </td> </tr> <tr> @@ -266,9 +264,7 @@ SPARK_MASTER_OPTS supports the following system properties: <td> Path to resources file which is used to find various resources while worker starting up. The content of resources file should be formatted like <code> - [[{"id":{"componentName": "spark.worker","resourceName":"gpu"},"addresses":["0","1","2"]}]]</code>. - When <code>spark.resources.coordinateResourcesInStandalone</code> is off, resources file must assign different - resources for workers and drivers in client mode that run on the same host to avoid resource conflict. + [{"id":{"componentName": "spark.worker","resourceName":"gpu"},"addresses":["0","1","2"]}]</code>. If a particular resource is not found in the resources file, the discovery script would be used to find that resource. If the discovery script also does not find the resources, the worker will fail to start up. @@ -346,9 +342,9 @@ Please make sure to have read the Custom Resource Scheduling and Configuration O Spark Standalone has 2 parts, the first is configuring the resources for the Worker, the second is the resource allocation for a specific application. -The user must configure the Workers to have a set of resources available so that it can assign them out to Executors. The <code>spark.worker.resource.{resourceName}.amount</code> is used to control the amount of each resource the worker has allocated. The user must also specify either <code>spark.worker.resourcesFile</code> or <code>spark.worker.resource.{resourceName}.discoveryScript</code> to specify how the Worker discovers the resources its assigned. See the descriptions above for ea [...] +The user must configure the Workers to have a set of resources available so that it can assign them out to Executors. The <code>spark.worker.resource.{resourceName}.amount</code> is used to control the amount of each resource the worker has allocated. The user must also specify either <code>spark.worker.resourcesFile</code> or <code>spark.worker.resource.{resourceName}.discoveryScript</code> to specify how the Worker discovers the resources its assigned. See the descriptions above for ea [...] -The second part is running an application on Spark Standalone. The only special case from the standard Spark resource configs is when you are running the Driver in client mode. For a Driver in client mode, the user can specify the resources it uses via <code>spark.driver.resourcesfile</code> or <code>spark.driver.resources.{resourceName}.discoveryScript</code>. If the Driver is running on the same host as other Drivers or Workers there are 2 ways to make sure the they don't use the same [...] +The second part is running an application on Spark Standalone. The only special case from the standard Spark resource configs is when you are running the Driver in client mode. For a Driver in client mode, the user can specify the resources it uses via <code>spark.driver.resourcesfile</code> or <code>spark.driver.resource.{resourceName}.discoveryScript</code>. If the Driver is running on the same host as other Drivers, please make sure the resources file or discovery script only returns [...] Note, the user does not need to specify a discovery script when submitting an application as the Worker will start each Executor with the resources it allocates to it. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org