This is an automated email from the ASF dual-hosted git repository.
jiangxb1987 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 148262f [SPARK-30969][CORE] Remove resource coordination support from
Standalone
148262f is described below
commit 148262f3fbdf7c5da7cd147cf43bf5ebab5f5244
Author: yi.wu <[email protected]>
AuthorDate: Mon Mar 2 11:23:07 2020 -0800
[SPARK-30969][CORE] Remove resource coordination support from Standalone
### What changes were proposed in this pull request?
Remove automatically resource coordination support from Standalone.
### Why are the changes needed?
Resource coordination is mainly designed for the scenario where multiple
workers launched on the same host. However, it's, actually, a non-existed
scenario for today's Spark. Because, Spark now can start multiple executors in
a single Worker, while it only allow one executor per Worker at very beginning.
So, now, it really help nothing for user to launch multiple workers on the same
host. Thus, it's not worth for us to bring over complicated implementation and
potential high maintain [...]
### Does this PR introduce any user-facing change?
No, it's Spark 3.0 feature.
### How was this patch tested?
Pass Jenkins.
Closes #27722 from Ngone51/abandon_coordination.
Authored-by: yi.wu <[email protected]>
Signed-off-by: Xingbo Jiang <[email protected]>
(cherry picked from commit b517f991fe0c95a186872d38be6a2091d9326195)
Signed-off-by: Xingbo Jiang <[email protected]>
---
.gitignore | 1 -
.../main/scala/org/apache/spark/SparkContext.scala | 25 +-
.../spark/deploy/StandaloneResourceUtils.scala | 263 +--------------------
.../org/apache/spark/deploy/worker/Worker.scala | 27 +--
.../org/apache/spark/internal/config/package.scala | 17 --
.../main/scala/org/apache/spark/util/Utils.scala | 22 --
.../scala/org/apache/spark/SparkContextSuite.scala | 6 +-
.../apache/spark/deploy/worker/WorkerSuite.scala | 74 +-----
.../resource/ResourceDiscoveryPluginSuite.scala | 4 -
docs/configuration.md | 27 +--
docs/spark-standalone.md | 10 +-
11 files changed, 17 insertions(+), 459 deletions(-)
diff --git a/.gitignore b/.gitignore
index 798e8ac..198fdee 100644
--- a/.gitignore
+++ b/.gitignore
@@ -72,7 +72,6 @@ scalastyle-on-compile.generated.xml
scalastyle-output.xml
scalastyle.txt
spark-*-bin-*.tgz
-spark-resources/
spark-tests.log
src_managed/
streaming-tests.log
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 91188d5..bcbb7e4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -41,7 +41,6 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat
=> NewFileInputFor
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
-import org.apache.spark.deploy.StandaloneResourceUtils._
import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource}
import org.apache.spark.input.{FixedLengthBinaryInputFormat,
PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
@@ -250,15 +249,6 @@ class SparkContext(config: SparkConf) extends Logging {
def isLocal: Boolean = Utils.isLocalMaster(_conf)
- private def isClientStandalone: Boolean = {
- val isSparkCluster = master match {
- case SparkMasterRegex.SPARK_REGEX(_) => true
- case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, _) => true
- case _ => false
- }
- deployMode == "client" && isSparkCluster
- }
-
/**
* @return true if context is stopped or in the midst of stopping.
*/
@@ -396,17 +386,7 @@ class SparkContext(config: SparkConf) extends Logging {
_driverLogger = DriverLogger(_conf)
val resourcesFileOpt = conf.get(DRIVER_RESOURCES_FILE)
- val allResources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX,
resourcesFileOpt)
- _resources = {
- // driver submitted in client mode under Standalone may have conflicting
resources with
- // other drivers/workers on this host. We should sync driver's resources
info into
- // SPARK_RESOURCES/SPARK_RESOURCES_COORDINATE_DIR/ to avoid collision.
- if (isClientStandalone) {
- acquireResources(_conf, SPARK_DRIVER_PREFIX, allResources,
Utils.getProcessId)
- } else {
- allResources
- }
- }
+ _resources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX,
resourcesFileOpt)
logResourceInfo(SPARK_DRIVER_PREFIX, _resources)
// log out spark.app.name in the Spark driver logs
@@ -2019,9 +1999,6 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
_progressBar.foreach(_.stop())
}
- if (isClientStandalone) {
- releaseResources(_conf, SPARK_DRIVER_PREFIX, _resources,
Utils.getProcessId)
- }
_taskScheduler = null
// TODO: Cache.stop()?
if (_env != null) {
diff --git
a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
index 65bf435..e08709e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
@@ -17,30 +17,21 @@
package org.apache.spark.deploy
-import java.io.{File, RandomAccessFile}
-import java.nio.channels.{FileLock, OverlappingFileLockException}
+import java.io.File
import java.nio.file.Files
import scala.collection.mutable
-import scala.util.Random
import scala.util.control.NonFatal
import org.json4s.{DefaultFormats, Extraction}
-import org.json4s.jackson.JsonMethods.{compact, parse, render}
+import org.json4s.jackson.JsonMethods.{compact, render}
-import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{SPARK_RESOURCES_COORDINATE,
SPARK_RESOURCES_DIR}
import org.apache.spark.resource.{ResourceAllocation, ResourceID,
ResourceInformation, ResourceRequirement}
-import org.apache.spark.resource.ResourceUtils.{parseResourceRequirements,
withResourcesJson}
import org.apache.spark.util.Utils
private[spark] object StandaloneResourceUtils extends Logging {
- // These directory/files are used to coordinate the resources between
- // the drivers/workers on the host in Spark Standalone.
- val SPARK_RESOURCES_COORDINATE_DIR = "spark-resources"
- val ALLOCATED_RESOURCES_FILE = "__allocated_resources__.json"
- val RESOURCES_LOCK_FILE = "__allocated_resources__.lock"
/**
* A mutable resource information which provides more efficient modification
on addresses.
@@ -87,249 +78,6 @@ private[spark] object StandaloneResourceUtils extends
Logging {
}
/**
- * Assigns (if coordinate needed) resources to workers/drivers from the same
host to avoid
- * address conflict.
- *
- * This function works in three steps. First, acquiring the lock on
RESOURCES_LOCK_FILE
- * to achieve synchronization among workers and drivers. Second, getting all
allocated
- * resources from ALLOCATED_RESOURCES_FILE and assigning isolated resources
to the worker
- * or driver after differentiating available resources in discovered
resources from
- * allocated resources. If available resources don't meet worker's or
driver's requirement,
- * try to update allocated resources by excluding the resource allocation if
its related
- * process has already terminated and do the assignment again. If still
don't meet requirement,
- * exception should be thrown. Third, updating ALLOCATED_RESOURCES_FILE with
new allocated
- * resources along with pid for the worker or driver. Then, return allocated
resources
- * information after releasing the lock.
- *
- * @param conf SparkConf
- * @param componentName spark.driver / spark.worker
- * @param resources the resources found by worker/driver on the host
- * @param pid the process id of worker/driver to acquire resources.
- * @return allocated resources for the worker/driver or throws exception if
can't
- * meet worker/driver's requirement
- */
- def acquireResources(
- conf: SparkConf,
- componentName: String,
- resources: Map[String, ResourceInformation],
- pid: Int)
- : Map[String, ResourceInformation] = {
- if (!needCoordinate(conf)) {
- return resources
- }
- val resourceRequirements = parseResourceRequirements(conf, componentName)
- if (resourceRequirements.isEmpty) {
- return Map.empty
- }
- val lock = acquireLock(conf)
- try {
- val resourcesFile = new File(getOrCreateResourcesDir(conf),
ALLOCATED_RESOURCES_FILE)
- // all allocated resources in ALLOCATED_RESOURCES_FILE, can be updated
if any allocations'
- // related processes detected to be terminated while checking pids below.
- var origAllocation = Seq.empty[StandaloneResourceAllocation]
- // Map[pid -> Map[resourceName -> Addresses[]]]
- var allocated = {
- if (resourcesFile.exists()) {
- origAllocation = allocatedStandaloneResources(resourcesFile.getPath)
- val allocations = origAllocation.map { resource =>
- val resourceMap = {
- resource.allocations.map { allocation =>
- allocation.id.resourceName -> allocation.addresses.toArray
- }.toMap
- }
- resource.pid -> resourceMap
- }.toMap
- allocations
- } else {
- Map.empty[Int, Map[String, Array[String]]]
- }
- }
-
- // new allocated resources for worker or driver,
- // map from resource name to its allocated addresses.
- var newAssignments: Map[String, Array[String]] = null
- // Whether we've checked process status and we'll only do the check at
most once.
- // Do the check iff the available resources can't meet the requirements
at the first time.
- var checked = false
- // Whether we need to keep allocating for the worker/driver and we'll
only go through
- // the loop at most twice.
- var keepAllocating = true
- while (keepAllocating) {
- keepAllocating = false
- // store the pid whose related allocated resources conflict with
- // discovered resources passed in.
- val pidsToCheck = mutable.Set[Int]()
- newAssignments = resourceRequirements.map { req =>
- val rName = req.resourceName
- val amount = req.amount
- // initially, we must have available.length >= amount as we've done
pre-check previously
- var available = resources(rName).addresses
- // gets available resource addresses by excluding all
- // allocated resource addresses from discovered resources
- allocated.foreach { a =>
- val thePid = a._1
- val resourceMap = a._2
- val assigned = resourceMap.getOrElse(rName, Array.empty)
- val retained = available.diff(assigned)
- // if len(retained) < len(available) after differ to assigned,
then, there must be
- // some conflicting resources addresses between available and
assigned. So, we should
- // store its pid here to check whether it's alive in case we don't
find enough
- // resources after traversal all allocated resources.
- if (retained.length < available.length && !checked) {
- pidsToCheck += thePid
- }
- if (retained.length >= amount) {
- available = retained
- } else if (checked) {
- keepAllocating = false
- throw new SparkException(s"No more resources available since
they've already" +
- s" assigned to other workers/drivers.")
- } else {
- keepAllocating = true
- }
- }
- val assigned = {
- if (keepAllocating) { // can't meet the requirement
- // excludes the allocation whose related process has already
been terminated.
- val (invalid, valid) = allocated.partition { a =>
- pidsToCheck(a._1) && !(Utils.isTesting ||
Utils.isProcessRunning(a._1))}
- allocated = valid
- origAllocation = origAllocation.filter(
- allocation => !invalid.contains(allocation.pid))
- checked = true
- // note this is a meaningless return value, just to avoid
creating any new object
- available
- } else {
- available.take(amount)
- }
- }
- rName -> assigned
- }.toMap
- }
- val newAllocation = {
- val allocations = newAssignments.map { case (rName, addresses) =>
- ResourceAllocation(new ResourceID(componentName, rName), addresses)
- }.toSeq
- StandaloneResourceAllocation(pid, allocations)
- }
- writeResourceAllocationJson(
- componentName, origAllocation ++ Seq(newAllocation), resourcesFile)
- newAllocation.toResourceInformationMap
- } finally {
- releaseLock(lock)
- }
- }
-
- /**
- * Frees (if coordinate needed) all the resources a worker/driver (pid) has
in one shot
- * to make those resources be available for other workers/drivers on the
same host.
- * @param conf SparkConf
- * @param componentName spark.driver / spark.worker
- * @param toRelease the resources expected to release
- * @param pid the process id of worker/driver to release resources.
- */
- def releaseResources(
- conf: SparkConf,
- componentName: String,
- toRelease: Map[String, ResourceInformation],
- pid: Int)
- : Unit = {
- if (!needCoordinate(conf)) {
- return
- }
- if (toRelease != null && toRelease.nonEmpty) {
- val lock = acquireLock(conf)
- try {
- val resourcesFile = new File(getOrCreateResourcesDir(conf),
ALLOCATED_RESOURCES_FILE)
- if (resourcesFile.exists()) {
- val (target, others) =
-
allocatedStandaloneResources(resourcesFile.getPath).partition(_.pid == pid)
- if (target.nonEmpty) {
- if (others.isEmpty) {
- if (!resourcesFile.delete()) {
- logError(s"Failed to delete $ALLOCATED_RESOURCES_FILE.")
- }
- } else {
- writeResourceAllocationJson(componentName, others, resourcesFile)
- }
- logDebug(s"$componentName(pid=$pid) released resources:
${toRelease.mkString("\n")}")
- } else {
- logWarning(s"$componentName(pid=$pid) has already released its
resources.")
- }
- }
- } finally {
- releaseLock(lock)
- }
- }
- }
-
- private def acquireLock(conf: SparkConf): FileLock = {
- val resourcesDir = getOrCreateResourcesDir(conf)
- val lockFile = new File(resourcesDir, RESOURCES_LOCK_FILE)
- val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel
- var keepTry = true
- var lock: FileLock = null
- while (keepTry) {
- try {
- lock = lockFileChannel.lock()
- logInfo(s"Acquired lock on $RESOURCES_LOCK_FILE.")
- keepTry = false
- } catch {
- case e: OverlappingFileLockException =>
- // This exception throws when we're in LocalSparkCluster mode.
FileLock is designed
- // to be used across JVMs, but our LocalSparkCluster is designed to
launch multiple
- // workers in the same JVM. As a result, when an worker in
LocalSparkCluster try to
- // acquire the lock on `resources.lock` which already locked by
other worker, we'll
- // hit this exception. So, we should manually control it.
- keepTry = true
- // there may be multiple workers race for the lock,
- // so, sleep for a random time to avoid possible conflict
- val duration = Random.nextInt(1000) + 1000
- Thread.sleep(duration)
- }
- }
- assert(lock != null, s"Acquired null lock on $RESOURCES_LOCK_FILE.")
- lock
- }
-
- private def releaseLock(lock: FileLock): Unit = {
- try {
- lock.release()
- lock.channel().close()
- logInfo(s"Released lock on $RESOURCES_LOCK_FILE.")
- } catch {
- case e: Exception =>
- logError(s"Error while releasing lock on $RESOURCES_LOCK_FILE.", e)
- }
- }
-
- private def getOrCreateResourcesDir(conf: SparkConf): File = {
- val coordinateDir = new File(conf.get(SPARK_RESOURCES_DIR).getOrElse {
- val sparkHome = if (Utils.isTesting) {
- assert(sys.props.contains("spark.test.home") ||
- sys.env.contains("SPARK_HOME"), "spark.test.home or SPARK_HOME is
not set.")
- sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME"))
- } else {
- sys.env.getOrElse("SPARK_HOME", ".")
- }
- sparkHome
- })
- val resourceDir = new File(coordinateDir, SPARK_RESOURCES_COORDINATE_DIR)
- if (!resourceDir.exists()) {
- Utils.createDirectory(resourceDir)
- }
- resourceDir
- }
-
- private def allocatedStandaloneResources(resourcesFile: String)
- : Seq[StandaloneResourceAllocation] = {
- withResourcesJson[StandaloneResourceAllocation](resourcesFile) { json =>
- implicit val formats = DefaultFormats
- parse(json).extract[Seq[StandaloneResourceAllocation]]
- }
- }
-
- /**
* Save the allocated resources of driver(cluster only) or executor into a
JSON formatted
* resources file. Used in Standalone only.
* @param componentName spark.driver / spark.executor
@@ -372,11 +120,6 @@ private[spark] object StandaloneResourceUtils extends
Logging {
Files.write(jsonFile.toPath, compact(render(allocationJson)).getBytes())
}
- /** Whether needs to coordinate resources among workers and drivers for user
*/
- def needCoordinate(conf: SparkConf): Boolean = {
- conf.get(SPARK_RESOURCES_COORDINATE)
- }
-
def toMutable(immutableResources: Map[String, ResourceInformation])
: Map[String, MutableResourceInfo] = {
immutableResources.map { case (rName, rInfo) =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 4be495a..73cac880 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.{Command, ExecutorDescription,
ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.StandaloneResourceUtils._
-import org.apache.spark.deploy.master.{DriverState, Master, WorkerResourceInfo}
+import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Tests.IS_TESTING
@@ -57,8 +57,7 @@ private[deploy] class Worker(
val conf: SparkConf,
val securityMgr: SecurityManager,
resourceFileOpt: Option[String] = None,
- externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null,
- pid: Int = Utils.getProcessId)
+ externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
extends ThreadSafeRpcEndpoint with Logging {
private val host = rpcEnv.address.host
@@ -205,7 +204,6 @@ private[deploy] class Worker(
logInfo("Spark home: " + sparkHome)
createWorkDir()
startExternalShuffleService()
- releaseResourcesOnInterrupt()
setupWorkerResources()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
@@ -219,26 +217,13 @@ private[deploy] class Worker(
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
- /**
- * Used to catch the TERM signal from sbin/stop-slave.sh and
- * release resources before Worker exits
- */
- private def releaseResourcesOnInterrupt(): Unit = {
- SignalUtils.register("TERM") {
- releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
- false
- }
- }
-
private def setupWorkerResources(): Unit = {
try {
- val allResources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX,
resourceFileOpt)
- resources = acquireResources(conf, SPARK_WORKER_PREFIX, allResources,
pid)
+ resources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX,
resourceFileOpt)
logResourceInfo(SPARK_WORKER_PREFIX, resources)
} catch {
case e: Exception =>
logError("Failed to setup worker resources: ", e)
- releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
if (!Utils.isTesting) {
System.exit(1)
}
@@ -373,7 +358,6 @@ private[deploy] class Worker(
TimeUnit.SECONDS))
}
} else {
- releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
logError("All masters are unresponsive! Giving up.")
System.exit(1)
}
@@ -472,7 +456,6 @@ private[deploy] class Worker(
case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
- releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
System.exit(1)
}
@@ -738,7 +721,6 @@ private[deploy] class Worker(
}
override def onStop(): Unit = {
- releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
cleanupThreadExecutor.shutdownNow()
metricsSystem.report()
cancelLastRegistrationRetry()
@@ -875,9 +857,8 @@ private[deploy] object Worker extends Logging {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
- val pid = if (Utils.isTesting) workerNumber.get else Utils.getProcessId
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores,
memory,
- masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr,
resourceFileOpt, pid = pid))
+ masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr,
resourceFileOpt))
rpcEnv
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 37ce178..23c31a5 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -37,23 +37,6 @@ package object config {
private[spark] val SPARK_TASK_PREFIX = "spark.task"
private[spark] val LISTENER_BUS_EVENT_QUEUE_PREFIX =
"spark.scheduler.listenerbus.eventqueue"
- private[spark] val SPARK_RESOURCES_COORDINATE =
- ConfigBuilder("spark.resources.coordinateResourcesInStandalone")
- .doc("Whether to coordinate resources automatically among
workers/drivers(client only) " +
- "in Standalone. If false, the user is responsible for configuring
different resources " +
- "for workers/drivers that run on the same host.")
- .booleanConf
- .createWithDefault(true)
-
- private[spark] val SPARK_RESOURCES_DIR =
- ConfigBuilder("spark.resources.dir")
- .doc("Directory used to coordinate resources among
workers/drivers(client only) in " +
- "Standalone. Default is SPARK_HOME. Make sure to use the same
directory for worker " +
- "and drivers in client mode that run on the same host. Don't clean up
this directory " +
- "while workers/drivers are still alive to avoid the most likely
resources conflict. ")
- .stringConf
- .createOptional
-
private[spark] val RESOURCES_DISCOVERY_PLUGIN =
ConfigBuilder("spark.resources.discoveryPlugin")
.doc("Comma-separated list of class names implementing" +
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index dde4323..9f332ba 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2547,28 +2547,6 @@ private[spark] object Utils extends Logging {
}
/**
- * Given a process id, return true if the process is still running.
- */
- def isProcessRunning(pid: Int): Boolean = {
- val process = executeCommand(Seq("kill", "-0", pid.toString))
- process.waitFor(10, TimeUnit.SECONDS)
- process.exitValue() == 0
- }
-
- /**
- * Returns the pid of this JVM process.
- */
- def getProcessId: Int = {
- val PROCESS = "(\\d+)@(.*)".r
- val name = getProcessName()
- name match {
- case PROCESS(pid, _) => pid.toInt
- case _ =>
- throw new SparkException(s"Unexpected process name: $name, expected to
be PID@hostname.")
- }
- }
-
- /**
* Returns the name of this JVM process. This is OS dependent but typically
(OSX, Linux, Windows),
* this is formatted as PID@hostname.
*/
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index df9c7c5..9f8fa89 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -841,7 +841,6 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
.setAppName("test-cluster")
.set(DRIVER_GPU_ID.amountConf, "3")
.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath)
- .set(SPARK_RESOURCES_DIR, dir.getName())
sc = new SparkContext(conf)
@@ -924,7 +923,7 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
assume(!(Utils.isWindows))
withTempDir { dir =>
val discoveryScript = createTempScriptWithExpectedOutput(dir,
"resourceDiscoveryScript",
- """{"name": "gpu","addresses":["0", "1", "2", "3", "4", "5", "6", "7",
"8"]}""")
+ """{"name": "gpu","addresses":["0", "1", "2"]}""")
val conf = new SparkConf()
.setMaster("local-cluster[3, 1, 1024]")
@@ -933,7 +932,6 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
.set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
.set(TASK_GPU_ID.amountConf, "3")
.set(EXECUTOR_GPU_ID.amountConf, "3")
- .set(SPARK_RESOURCES_DIR, dir.getName())
sc = new SparkContext(conf)
@@ -945,7 +943,7 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
context.resources().get(GPU).get.addresses.iterator
}
val gpus = rdd.collect()
- assert(gpus.sorted === Seq("0", "1", "2", "3", "4", "5", "6", "7", "8"))
+ assert(gpus.sorted === Seq("0", "0", "0", "1", "1", "1", "2", "2", "2"))
eventually(timeout(10.seconds)) {
assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum
== 0)
diff --git
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index bb541b4..2d3d0af 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -36,7 +36,6 @@ import org.apache.spark.{SecurityManager, SparkConf,
SparkFunSuite}
import org.apache.spark.TestUtils.{createTempJsonFile,
createTempScriptWithExpectedOutput}
import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged,
ExecutorStateChanged, WorkDirCleanup}
-import
org.apache.spark.deploy.StandaloneResourceUtils.{ALLOCATED_RESOURCES_FILE,
SPARK_RESOURCES_COORDINATE_DIR}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Worker._
@@ -64,7 +63,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with
BeforeAndAfter {
private def makeWorker(
conf: SparkConf = new SparkConf(),
shuffleServiceSupplier: Supplier[ExternalShuffleService] = null,
- pid: Int = Utils.getProcessId,
local: Boolean = false): Worker = {
assert(_worker === null, "Some Worker's RpcEnv is leaked in tests")
val securityMgr = new SecurityManager(conf)
@@ -72,7 +70,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with
BeforeAndAfter {
val resourcesFile = conf.get(SPARK_WORKER_RESOURCE_FILE)
val localWorker = new Worker(rpcEnv, 50000, 20, 1234 * 5,
Array.fill(1)(RpcAddress("1.2.3.4", 1234)), "Worker", "/tmp",
- conf, securityMgr, resourcesFile, shuffleServiceSupplier, pid)
+ conf, securityMgr, resourcesFile, shuffleServiceSupplier)
if (local) {
localWorker
} else {
@@ -81,14 +79,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with
BeforeAndAfter {
}
}
- private def assertResourcesFileDeleted(): Unit = {
- assert(sys.props.contains("spark.test.home"))
- val sparkHome = sys.props.get("spark.test.home")
- val resourceFile = new File(sparkHome + "/" +
SPARK_RESOURCES_COORDINATE_DIR,
- ALLOCATED_RESOURCES_FILE)
- assert(!resourceFile.exists())
- }
-
before {
MockitoAnnotations.initMocks(this)
}
@@ -251,7 +241,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with
BeforeAndAfter {
worker.rpcEnv.shutdown()
worker.rpcEnv.awaitTermination()
}
- assertResourcesFileDeleted()
}
test("worker could load resources from resources file while launching") {
@@ -273,7 +262,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with
BeforeAndAfter {
worker.rpcEnv.shutdown()
worker.rpcEnv.awaitTermination()
}
- assertResourcesFileDeleted()
}
}
@@ -292,7 +280,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with
BeforeAndAfter {
worker.rpcEnv.shutdown()
worker.rpcEnv.awaitTermination()
}
- assertResourcesFileDeleted()
}
}
@@ -316,65 +303,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with
BeforeAndAfter {
worker.rpcEnv.shutdown()
worker.rpcEnv.awaitTermination()
}
- assertResourcesFileDeleted()
- }
- }
-
- test("Workers run on the same host should avoid resources conflict when
coordinate is on") {
- val conf = new SparkConf()
- withTempDir { dir =>
- val scriptPath = createTempScriptWithExpectedOutput(dir,
"fpgaDiscoverScript",
- """{"name": "fpga","addresses":["f1", "f2", "f3", "f4", "f5"]}""")
- conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath)
- conf.set(WORKER_FPGA_ID.amountConf, "2")
- val workers = (0 until 3).map(id => makeWorker(conf, pid = id, local =
true))
- workers.zipWithIndex.foreach{case (w, i) =>
w.rpcEnv.setupEndpoint(s"worker$i", w)}
- eventually(timeout(20.seconds)) {
- val (empty, nonEmpty) = workers.partition(_.resources.isEmpty)
- assert(empty.length === 1)
- assert(nonEmpty.length === 2)
- val totalResources =
nonEmpty.flatMap(_.resources(FPGA).addresses).toSet.toSeq.sorted
- assert(totalResources === Seq("f1", "f2", "f3", "f4"))
- workers.foreach(_.rpcEnv.shutdown())
- workers.foreach(_.rpcEnv.awaitTermination())
- }
- assertResourcesFileDeleted()
- }
- }
-
- test("Workers run on the same host should load resources naively when
coordinate is off") {
- val conf = new SparkConf()
- // disable coordination
- conf.set(config.SPARK_RESOURCES_COORDINATE, false)
- withTempDir { dir =>
- val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("g0", "g1"))
- val ja = Extraction.decompose(Seq(gpuArgs))
- val resourcesPath = createTempJsonFile(dir, "resources", ja)
- val scriptPath = createTempScriptWithExpectedOutput(dir,
"fpgaDiscoverScript",
- """{"name": "fpga","addresses":["f1", "f2", "f3", "f4", "f5"]}""")
- conf.set(SPARK_WORKER_RESOURCE_FILE.key, resourcesPath)
- conf.set(WORKER_GPU_ID.amountConf, "2")
- conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath)
- conf.set(WORKER_FPGA_ID.amountConf, "2")
- val workers = (0 until 3).map(id => makeWorker(conf, pid = id, local =
true))
- workers.zipWithIndex.foreach{case (w, i) =>
w.rpcEnv.setupEndpoint(s"worker$i", w)}
- eventually(timeout(20.seconds)) {
- val (empty, nonEmpty) = workers.partition(_.resources.isEmpty)
- assert(empty.length === 0)
- assert(nonEmpty.length === 3)
- // Each Worker should get the same resources from resources file and
discovery script
- // without coordination. Note that, normally, we must config different
resources
- // for workers run on the same host when coordinate config is off.
Test here is used
- // to validate the different behaviour comparing to the above test
when coordinate config
- // is on, so we admit the resources collision here.
- nonEmpty.foreach { worker =>
- assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation,
- FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3",
"f4", "f5"))))
- }
- workers.foreach(_.rpcEnv.shutdown())
- workers.foreach(_.rpcEnv.awaitTermination())
- }
- assertResourcesFileDeleted()
}
}
diff --git
a/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala
b/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala
index 437c903..ff7d680 100644
---
a/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala
+++
b/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala
@@ -50,7 +50,6 @@ class ResourceDiscoveryPluginSuite extends SparkFunSuite with
LocalSparkContext
.set(WORKER_GPU_ID.amountConf, "2")
.set(TASK_GPU_ID.amountConf, "1")
.set(EXECUTOR_GPU_ID.amountConf, "1")
- .set(SPARK_RESOURCES_DIR, dir.getName())
.set(WORKER_FPGA_ID.amountConf, "2")
.set(TASK_FPGA_ID.amountConf, "1")
.set(EXECUTOR_FPGA_ID.amountConf, "1")
@@ -81,7 +80,6 @@ class ResourceDiscoveryPluginSuite extends SparkFunSuite with
LocalSparkContext
.set(WORKER_GPU_ID.amountConf, "2")
.set(TASK_GPU_ID.amountConf, "1")
.set(EXECUTOR_GPU_ID.amountConf, "1")
- .set(SPARK_RESOURCES_DIR, dir.getName())
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
@@ -108,7 +106,6 @@ class ResourceDiscoveryPluginSuite extends SparkFunSuite
with LocalSparkContext
.set(WORKER_GPU_ID.amountConf, "2")
.set(TASK_GPU_ID.amountConf, "1")
.set(EXECUTOR_GPU_ID.amountConf, "1")
- .set(SPARK_RESOURCES_DIR, dir.getName())
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
@@ -134,7 +131,6 @@ class ResourceDiscoveryPluginSuite extends SparkFunSuite
with LocalSparkContext
.set(RESOURCES_DISCOVERY_PLUGIN,
Seq(classOf[TestResourceDiscoveryPluginEmpty].getName()))
.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath)
.set(DRIVER_GPU_ID.amountConf, "2")
- .set(SPARK_RESOURCES_DIR, dir.getName())
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
diff --git a/docs/configuration.md b/docs/configuration.md
index 469feed..5e6fe93 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -194,25 +194,6 @@ of the most common options to set are:
</td>
</tr>
<tr>
- <td><code>spark.resources.coordinateResourcesInStandalone</code></td>
- <td>true</td>
- <td>
- Whether to coordinate resources automatically among workers/drivers(client
only)
- in Standalone. If false, the user is responsible for configuring different
resources
- for workers/drivers that run on the same host.
- </td>
-</tr>
-<tr>
- <td><code>spark.resources.dir</code></td>
- <td>SPARK_HOME</td>
- <td>
- Directory used to coordinate resources among workers/drivers(client only)
in Standalone.
- Default is SPARK_HOME. Make sure to use the same directory for worker and
drivers in
- client mode that run on the same host. Don't clean up this directory while
workers/drivers
- are still alive to avoid the most likely resources conflict.
- </td>
-</tr>
-<tr>
<td><code>spark.driver.resource.{resourceName}.amount</code></td>
<td>0</td>
<td>
@@ -228,9 +209,8 @@ of the most common options to set are:
<td>
A script for the driver to run to discover a particular resource type.
This should
write to STDOUT a JSON string in the format of the ResourceInformation
class. This has a
- name and an array of addresses. For a client-submitted driver in
Standalone, discovery
- script must assign different resource addresses to this driver comparing
to workers' and
- other drivers' when
<code>spark.resources.coordinateResourcesInStandalone</code> is off.
+ name and an array of addresses. For a client-submitted driver, discovery
script must assign
+ different resource addresses to this driver comparing to other drivers on
the same host.
</td>
</tr>
<tr>
@@ -2755,5 +2735,4 @@ There are configurations available to request resources
for the driver: <code>sp
Spark will use the configurations specified to first request containers with
the corresponding resources from the cluster manager. Once it gets the
container, Spark launches an Executor in that container which will discover
what resources the container has and the addresses associated with each
resource. The Executor will register with the Driver and report back the
resources available to that Executor. The Spark scheduler can then schedule
tasks to each Executor and assign specific reso [...]
-See your cluster manager specific page for requirements and details on each of
- [YARN](running-on-yarn.html#resource-allocation-and-configuration-overview),
[Kubernetes](running-on-kubernetes.html#resource-allocation-and-configuration-overview)
and [Standalone
Mode](spark-standalone.html#resource-allocation-and-configuration-overview). It
is currently not available with Mesos or local mode. If using local-cluster
mode see the Spark Standalone documentation but be aware only a single wor [...]
-
+See your cluster manager specific page for requirements and details on each of
- [YARN](running-on-yarn.html#resource-allocation-and-configuration-overview),
[Kubernetes](running-on-kubernetes.html#resource-allocation-and-configuration-overview)
and [Standalone
Mode](spark-standalone.html#resource-allocation-and-configuration-overview). It
is currently not available with Mesos or local mode. And please also note that
local-cluster mode with multiple workers is not supported(see Standalon [...]
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index ff310ed..17b6772 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -256,8 +256,6 @@ SPARK_MASTER_OPTS supports the following system properties:
<td>
Path to resource discovery script, which is used to find a particular
resource while worker starting up.
And the output of the script should be formatted like the
<code>ResourceInformation</code> class.
- When <code>spark.resources.coordinateResourcesInStandalone</code> is off,
the discovery script must assign different
- resources for workers and drivers in client mode that run on the same host
to avoid resource conflict.
</td>
</tr>
<tr>
@@ -266,9 +264,7 @@ SPARK_MASTER_OPTS supports the following system properties:
<td>
Path to resources file which is used to find various resources while
worker starting up.
The content of resources file should be formatted like <code>
- [[{"id":{"componentName":
"spark.worker","resourceName":"gpu"},"addresses":["0","1","2"]}]]</code>.
- When <code>spark.resources.coordinateResourcesInStandalone</code> is off,
resources file must assign different
- resources for workers and drivers in client mode that run on the same host
to avoid resource conflict.
+ [{"id":{"componentName":
"spark.worker","resourceName":"gpu"},"addresses":["0","1","2"]}]</code>.
If a particular resource is not found in the resources file, the discovery
script would be used to
find that resource. If the discovery script also does not find the
resources, the worker will fail
to start up.
@@ -346,9 +342,9 @@ Please make sure to have read the Custom Resource
Scheduling and Configuration O
Spark Standalone has 2 parts, the first is configuring the resources for the
Worker, the second is the resource allocation for a specific application.
-The user must configure the Workers to have a set of resources available so
that it can assign them out to Executors. The
<code>spark.worker.resource.{resourceName}.amount</code> is used to control the
amount of each resource the worker has allocated. The user must also specify
either <code>spark.worker.resourcesFile</code> or
<code>spark.worker.resource.{resourceName}.discoveryScript</code> to specify
how the Worker discovers the resources its assigned. See the descriptions above
for ea [...]
+The user must configure the Workers to have a set of resources available so
that it can assign them out to Executors. The
<code>spark.worker.resource.{resourceName}.amount</code> is used to control the
amount of each resource the worker has allocated. The user must also specify
either <code>spark.worker.resourcesFile</code> or
<code>spark.worker.resource.{resourceName}.discoveryScript</code> to specify
how the Worker discovers the resources its assigned. See the descriptions above
for ea [...]
-The second part is running an application on Spark Standalone. The only
special case from the standard Spark resource configs is when you are running
the Driver in client mode. For a Driver in client mode, the user can specify
the resources it uses via <code>spark.driver.resourcesfile</code> or
<code>spark.driver.resources.{resourceName}.discoveryScript</code>. If the
Driver is running on the same host as other Drivers or Workers there are 2 ways
to make sure the they don't use the same [...]
+The second part is running an application on Spark Standalone. The only
special case from the standard Spark resource configs is when you are running
the Driver in client mode. For a Driver in client mode, the user can specify
the resources it uses via <code>spark.driver.resourcesfile</code> or
<code>spark.driver.resource.{resourceName}.discoveryScript</code>. If the
Driver is running on the same host as other Drivers, please make sure the
resources file or discovery script only returns [...]
Note, the user does not need to specify a discovery script when submitting an
application as the Worker will start each Executor with the resources it
allocates to it.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]