This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 148262f  [SPARK-30969][CORE] Remove resource coordination support from 
Standalone
148262f is described below

commit 148262f3fbdf7c5da7cd147cf43bf5ebab5f5244
Author: yi.wu <yi...@databricks.com>
AuthorDate: Mon Mar 2 11:23:07 2020 -0800

    [SPARK-30969][CORE] Remove resource coordination support from Standalone
    
    ### What changes were proposed in this pull request?
    
    Remove automatically resource coordination support from Standalone.
    
    ### Why are the changes needed?
    
    Resource coordination is mainly designed for the scenario where multiple 
workers launched on the same host. However, it's, actually, a non-existed  
scenario for today's Spark. Because, Spark now can start multiple executors in 
a single Worker, while it only allow one executor per Worker at very beginning. 
So, now, it really help nothing for user to launch multiple workers on the same 
host. Thus, it's not worth for us to bring over complicated implementation and 
potential high maintain [...]
    
    ### Does this PR introduce any user-facing change?
    
    No, it's Spark 3.0 feature.
    
    ### How was this patch tested?
    
    Pass Jenkins.
    
    Closes #27722 from Ngone51/abandon_coordination.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com>
    (cherry picked from commit b517f991fe0c95a186872d38be6a2091d9326195)
    Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com>
---
 .gitignore                                         |   1 -
 .../main/scala/org/apache/spark/SparkContext.scala |  25 +-
 .../spark/deploy/StandaloneResourceUtils.scala     | 263 +--------------------
 .../org/apache/spark/deploy/worker/Worker.scala    |  27 +--
 .../org/apache/spark/internal/config/package.scala |  17 --
 .../main/scala/org/apache/spark/util/Utils.scala   |  22 --
 .../scala/org/apache/spark/SparkContextSuite.scala |   6 +-
 .../apache/spark/deploy/worker/WorkerSuite.scala   |  74 +-----
 .../resource/ResourceDiscoveryPluginSuite.scala    |   4 -
 docs/configuration.md                              |  27 +--
 docs/spark-standalone.md                           |  10 +-
 11 files changed, 17 insertions(+), 459 deletions(-)

diff --git a/.gitignore b/.gitignore
index 798e8ac..198fdee 100644
--- a/.gitignore
+++ b/.gitignore
@@ -72,7 +72,6 @@ scalastyle-on-compile.generated.xml
 scalastyle-output.xml
 scalastyle.txt
 spark-*-bin-*.tgz
-spark-resources/
 spark-tests.log
 src_managed/
 streaming-tests.log
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 91188d5..bcbb7e4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -41,7 +41,6 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat 
=> NewFileInputFor
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
-import org.apache.spark.deploy.StandaloneResourceUtils._
 import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource}
 import org.apache.spark.input.{FixedLengthBinaryInputFormat, 
PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
 import org.apache.spark.internal.Logging
@@ -250,15 +249,6 @@ class SparkContext(config: SparkConf) extends Logging {
 
   def isLocal: Boolean = Utils.isLocalMaster(_conf)
 
-  private def isClientStandalone: Boolean = {
-    val isSparkCluster = master match {
-      case SparkMasterRegex.SPARK_REGEX(_) => true
-      case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, _) => true
-      case _ => false
-    }
-    deployMode == "client" && isSparkCluster
-  }
-
   /**
    * @return true if context is stopped or in the midst of stopping.
    */
@@ -396,17 +386,7 @@ class SparkContext(config: SparkConf) extends Logging {
     _driverLogger = DriverLogger(_conf)
 
     val resourcesFileOpt = conf.get(DRIVER_RESOURCES_FILE)
-    val allResources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, 
resourcesFileOpt)
-    _resources = {
-      // driver submitted in client mode under Standalone may have conflicting 
resources with
-      // other drivers/workers on this host. We should sync driver's resources 
info into
-      // SPARK_RESOURCES/SPARK_RESOURCES_COORDINATE_DIR/ to avoid collision.
-      if (isClientStandalone) {
-        acquireResources(_conf, SPARK_DRIVER_PREFIX, allResources, 
Utils.getProcessId)
-      } else {
-        allResources
-      }
-    }
+    _resources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, 
resourcesFileOpt)
     logResourceInfo(SPARK_DRIVER_PREFIX, _resources)
 
     // log out spark.app.name in the Spark driver logs
@@ -2019,9 +1999,6 @@ class SparkContext(config: SparkConf) extends Logging {
     Utils.tryLogNonFatalError {
       _progressBar.foreach(_.stop())
     }
-    if (isClientStandalone) {
-      releaseResources(_conf, SPARK_DRIVER_PREFIX, _resources, 
Utils.getProcessId)
-    }
     _taskScheduler = null
     // TODO: Cache.stop()?
     if (_env != null) {
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
index 65bf435..e08709e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
@@ -17,30 +17,21 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, RandomAccessFile}
-import java.nio.channels.{FileLock, OverlappingFileLockException}
+import java.io.File
 import java.nio.file.Files
 
 import scala.collection.mutable
-import scala.util.Random
 import scala.util.control.NonFatal
 
 import org.json4s.{DefaultFormats, Extraction}
-import org.json4s.jackson.JsonMethods.{compact, parse, render}
+import org.json4s.jackson.JsonMethods.{compact, render}
 
-import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{SPARK_RESOURCES_COORDINATE, 
SPARK_RESOURCES_DIR}
 import org.apache.spark.resource.{ResourceAllocation, ResourceID, 
ResourceInformation, ResourceRequirement}
-import org.apache.spark.resource.ResourceUtils.{parseResourceRequirements, 
withResourcesJson}
 import org.apache.spark.util.Utils
 
 private[spark] object StandaloneResourceUtils extends Logging {
-  // These directory/files are used to coordinate the resources between
-  // the drivers/workers on the host in Spark Standalone.
-  val SPARK_RESOURCES_COORDINATE_DIR = "spark-resources"
-  val ALLOCATED_RESOURCES_FILE = "__allocated_resources__.json"
-  val RESOURCES_LOCK_FILE = "__allocated_resources__.lock"
 
   /**
    * A mutable resource information which provides more efficient modification 
on addresses.
@@ -87,249 +78,6 @@ private[spark] object StandaloneResourceUtils extends 
Logging {
   }
 
   /**
-   * Assigns (if coordinate needed) resources to workers/drivers from the same 
host to avoid
-   * address conflict.
-   *
-   * This function works in three steps. First, acquiring the lock on 
RESOURCES_LOCK_FILE
-   * to achieve synchronization among workers and drivers. Second, getting all 
allocated
-   * resources from ALLOCATED_RESOURCES_FILE and assigning isolated resources 
to the worker
-   * or driver after differentiating available resources in discovered 
resources from
-   * allocated resources. If available resources don't meet worker's or 
driver's requirement,
-   * try to update allocated resources by excluding the resource allocation if 
its related
-   * process has already terminated and do the assignment again. If still 
don't meet requirement,
-   * exception should be thrown. Third, updating ALLOCATED_RESOURCES_FILE with 
new allocated
-   * resources along with pid for the worker or driver. Then, return allocated 
resources
-   * information after releasing the lock.
-   *
-   * @param conf SparkConf
-   * @param componentName spark.driver / spark.worker
-   * @param resources the resources found by worker/driver on the host
-   * @param pid the process id of worker/driver to acquire resources.
-   * @return allocated resources for the worker/driver or throws exception if 
can't
-   *         meet worker/driver's requirement
-   */
-  def acquireResources(
-      conf: SparkConf,
-      componentName: String,
-      resources: Map[String, ResourceInformation],
-      pid: Int)
-    : Map[String, ResourceInformation] = {
-    if (!needCoordinate(conf)) {
-      return resources
-    }
-    val resourceRequirements = parseResourceRequirements(conf, componentName)
-    if (resourceRequirements.isEmpty) {
-      return Map.empty
-    }
-    val lock = acquireLock(conf)
-    try {
-      val resourcesFile = new File(getOrCreateResourcesDir(conf), 
ALLOCATED_RESOURCES_FILE)
-      // all allocated resources in ALLOCATED_RESOURCES_FILE, can be updated 
if any allocations'
-      // related processes detected to be terminated while checking pids below.
-      var origAllocation = Seq.empty[StandaloneResourceAllocation]
-      // Map[pid -> Map[resourceName -> Addresses[]]]
-      var allocated = {
-        if (resourcesFile.exists()) {
-          origAllocation = allocatedStandaloneResources(resourcesFile.getPath)
-          val allocations = origAllocation.map { resource =>
-            val resourceMap = {
-              resource.allocations.map { allocation =>
-                allocation.id.resourceName -> allocation.addresses.toArray
-              }.toMap
-            }
-            resource.pid -> resourceMap
-          }.toMap
-          allocations
-        } else {
-          Map.empty[Int, Map[String, Array[String]]]
-        }
-      }
-
-      // new allocated resources for worker or driver,
-      // map from resource name to its allocated addresses.
-      var newAssignments: Map[String, Array[String]] = null
-      // Whether we've checked process status and we'll only do the check at 
most once.
-      // Do the check iff the available resources can't meet the requirements 
at the first time.
-      var checked = false
-      // Whether we need to keep allocating for the worker/driver and we'll 
only go through
-      // the loop at most twice.
-      var keepAllocating = true
-      while (keepAllocating) {
-        keepAllocating = false
-        // store the pid whose related allocated resources conflict with
-        // discovered resources passed in.
-        val pidsToCheck = mutable.Set[Int]()
-        newAssignments = resourceRequirements.map { req =>
-          val rName = req.resourceName
-          val amount = req.amount
-          // initially, we must have available.length >= amount as we've done 
pre-check previously
-          var available = resources(rName).addresses
-          // gets available resource addresses by excluding all
-          // allocated resource addresses from discovered resources
-          allocated.foreach { a =>
-            val thePid = a._1
-            val resourceMap = a._2
-            val assigned = resourceMap.getOrElse(rName, Array.empty)
-            val retained = available.diff(assigned)
-            // if len(retained) < len(available) after differ to assigned, 
then, there must be
-            // some conflicting resources addresses between available and 
assigned. So, we should
-            // store its pid here to check whether it's alive in case we don't 
find enough
-            // resources after traversal all allocated resources.
-            if (retained.length < available.length && !checked) {
-              pidsToCheck += thePid
-            }
-            if (retained.length >= amount) {
-              available = retained
-            } else if (checked) {
-              keepAllocating = false
-              throw new SparkException(s"No more resources available since 
they've already" +
-                s" assigned to other workers/drivers.")
-            } else {
-              keepAllocating = true
-            }
-          }
-          val assigned = {
-            if (keepAllocating) { // can't meet the requirement
-              // excludes the allocation whose related process has already 
been terminated.
-              val (invalid, valid) = allocated.partition { a =>
-                pidsToCheck(a._1) && !(Utils.isTesting || 
Utils.isProcessRunning(a._1))}
-              allocated = valid
-              origAllocation = origAllocation.filter(
-                allocation => !invalid.contains(allocation.pid))
-              checked = true
-              // note this is a meaningless return value, just to avoid 
creating any new object
-              available
-            } else {
-              available.take(amount)
-            }
-          }
-          rName -> assigned
-        }.toMap
-      }
-      val newAllocation = {
-        val allocations = newAssignments.map { case (rName, addresses) =>
-          ResourceAllocation(new ResourceID(componentName, rName), addresses)
-        }.toSeq
-        StandaloneResourceAllocation(pid, allocations)
-      }
-      writeResourceAllocationJson(
-        componentName, origAllocation ++ Seq(newAllocation), resourcesFile)
-      newAllocation.toResourceInformationMap
-    } finally {
-      releaseLock(lock)
-    }
-  }
-
-  /**
-   * Frees (if coordinate needed) all the resources a worker/driver (pid) has 
in one shot
-   * to make those resources be available for other workers/drivers on the 
same host.
-   * @param conf SparkConf
-   * @param componentName spark.driver / spark.worker
-   * @param toRelease the resources expected to release
-   * @param pid the process id of worker/driver to release resources.
-   */
-  def releaseResources(
-      conf: SparkConf,
-      componentName: String,
-      toRelease: Map[String, ResourceInformation],
-      pid: Int)
-    : Unit = {
-    if (!needCoordinate(conf)) {
-      return
-    }
-    if (toRelease != null && toRelease.nonEmpty) {
-      val lock = acquireLock(conf)
-      try {
-        val resourcesFile = new File(getOrCreateResourcesDir(conf), 
ALLOCATED_RESOURCES_FILE)
-        if (resourcesFile.exists()) {
-          val (target, others) =
-            
allocatedStandaloneResources(resourcesFile.getPath).partition(_.pid == pid)
-          if (target.nonEmpty) {
-            if (others.isEmpty) {
-              if (!resourcesFile.delete()) {
-                logError(s"Failed to delete $ALLOCATED_RESOURCES_FILE.")
-              }
-            } else {
-              writeResourceAllocationJson(componentName, others, resourcesFile)
-            }
-            logDebug(s"$componentName(pid=$pid) released resources: 
${toRelease.mkString("\n")}")
-          } else {
-            logWarning(s"$componentName(pid=$pid) has already released its 
resources.")
-          }
-        }
-      } finally {
-        releaseLock(lock)
-      }
-    }
-  }
-
-  private def acquireLock(conf: SparkConf): FileLock = {
-    val resourcesDir = getOrCreateResourcesDir(conf)
-    val lockFile = new File(resourcesDir, RESOURCES_LOCK_FILE)
-    val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel
-    var keepTry = true
-    var lock: FileLock = null
-    while (keepTry) {
-      try {
-        lock = lockFileChannel.lock()
-        logInfo(s"Acquired lock on $RESOURCES_LOCK_FILE.")
-        keepTry = false
-      } catch {
-        case e: OverlappingFileLockException =>
-          // This exception throws when we're in LocalSparkCluster mode. 
FileLock is designed
-          // to be used across JVMs, but our LocalSparkCluster is designed to 
launch multiple
-          // workers in the same JVM. As a result, when an worker in 
LocalSparkCluster try to
-          // acquire the lock on `resources.lock` which already locked by 
other worker, we'll
-          // hit this exception. So, we should manually control it.
-          keepTry = true
-          // there may be multiple workers race for the lock,
-          // so, sleep for a random time to avoid possible conflict
-          val duration = Random.nextInt(1000) + 1000
-          Thread.sleep(duration)
-      }
-    }
-    assert(lock != null, s"Acquired null lock on $RESOURCES_LOCK_FILE.")
-    lock
-  }
-
-  private def releaseLock(lock: FileLock): Unit = {
-    try {
-      lock.release()
-      lock.channel().close()
-      logInfo(s"Released lock on $RESOURCES_LOCK_FILE.")
-    } catch {
-      case e: Exception =>
-        logError(s"Error while releasing lock on $RESOURCES_LOCK_FILE.", e)
-    }
-  }
-
-  private def getOrCreateResourcesDir(conf: SparkConf): File = {
-    val coordinateDir = new File(conf.get(SPARK_RESOURCES_DIR).getOrElse {
-      val sparkHome = if (Utils.isTesting) {
-        assert(sys.props.contains("spark.test.home") ||
-          sys.env.contains("SPARK_HOME"), "spark.test.home or SPARK_HOME is 
not set.")
-        sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME"))
-      } else {
-        sys.env.getOrElse("SPARK_HOME", ".")
-      }
-      sparkHome
-    })
-    val resourceDir = new File(coordinateDir, SPARK_RESOURCES_COORDINATE_DIR)
-    if (!resourceDir.exists()) {
-      Utils.createDirectory(resourceDir)
-    }
-    resourceDir
-  }
-
-  private def allocatedStandaloneResources(resourcesFile: String)
-  : Seq[StandaloneResourceAllocation] = {
-    withResourcesJson[StandaloneResourceAllocation](resourcesFile) { json =>
-      implicit val formats = DefaultFormats
-      parse(json).extract[Seq[StandaloneResourceAllocation]]
-    }
-  }
-
-  /**
    * Save the allocated resources of driver(cluster only) or executor into a 
JSON formatted
    * resources file. Used in Standalone only.
    * @param componentName spark.driver / spark.executor
@@ -372,11 +120,6 @@ private[spark] object StandaloneResourceUtils extends 
Logging {
     Files.write(jsonFile.toPath, compact(render(allocationJson)).getBytes())
   }
 
-  /** Whether needs to coordinate resources among workers and drivers for user 
*/
-  def needCoordinate(conf: SparkConf): Boolean = {
-    conf.get(SPARK_RESOURCES_COORDINATE)
-  }
-
   def toMutable(immutableResources: Map[String, ResourceInformation])
     : Map[String, MutableResourceInfo] = {
     immutableResources.map { case (rName, rInfo) =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 4be495a..73cac880 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.{Command, ExecutorDescription, 
ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.ExternalShuffleService
 import org.apache.spark.deploy.StandaloneResourceUtils._
-import org.apache.spark.deploy.master.{DriverState, Master, WorkerResourceInfo}
+import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config.Tests.IS_TESTING
@@ -57,8 +57,7 @@ private[deploy] class Worker(
     val conf: SparkConf,
     val securityMgr: SecurityManager,
     resourceFileOpt: Option[String] = None,
-    externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null,
-    pid: Int = Utils.getProcessId)
+    externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
   extends ThreadSafeRpcEndpoint with Logging {
 
   private val host = rpcEnv.address.host
@@ -205,7 +204,6 @@ private[deploy] class Worker(
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
     startExternalShuffleService()
-    releaseResourcesOnInterrupt()
     setupWorkerResources()
     webUi = new WorkerWebUI(this, workDir, webUiPort)
     webUi.bind()
@@ -219,26 +217,13 @@ private[deploy] class Worker(
     metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
-  /**
-   * Used to catch the TERM signal from sbin/stop-slave.sh and
-   * release resources before Worker exits
-   */
-  private def releaseResourcesOnInterrupt(): Unit = {
-    SignalUtils.register("TERM") {
-      releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
-      false
-    }
-  }
-
   private def setupWorkerResources(): Unit = {
     try {
-      val allResources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, 
resourceFileOpt)
-      resources = acquireResources(conf, SPARK_WORKER_PREFIX, allResources, 
pid)
+      resources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, 
resourceFileOpt)
       logResourceInfo(SPARK_WORKER_PREFIX, resources)
     } catch {
       case e: Exception =>
         logError("Failed to setup worker resources: ", e)
-        releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
         if (!Utils.isTesting) {
           System.exit(1)
         }
@@ -373,7 +358,6 @@ private[deploy] class Worker(
               TimeUnit.SECONDS))
         }
       } else {
-        releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
         logError("All masters are unresponsive! Giving up.")
         System.exit(1)
       }
@@ -472,7 +456,6 @@ private[deploy] class Worker(
       case RegisterWorkerFailed(message) =>
         if (!registered) {
           logError("Worker registration failed: " + message)
-          releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
           System.exit(1)
         }
 
@@ -738,7 +721,6 @@ private[deploy] class Worker(
   }
 
   override def onStop(): Unit = {
-    releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid)
     cleanupThreadExecutor.shutdownNow()
     metricsSystem.report()
     cancelLastRegistrationRetry()
@@ -875,9 +857,8 @@ private[deploy] object Worker extends Logging {
     val securityMgr = new SecurityManager(conf)
     val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
     val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
-    val pid = if (Utils.isTesting) workerNumber.get else Utils.getProcessId
     rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, 
memory,
-      masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, 
resourceFileOpt, pid = pid))
+      masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, 
resourceFileOpt))
     rpcEnv
   }
 
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 37ce178..23c31a5 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -37,23 +37,6 @@ package object config {
   private[spark] val SPARK_TASK_PREFIX = "spark.task"
   private[spark] val LISTENER_BUS_EVENT_QUEUE_PREFIX = 
"spark.scheduler.listenerbus.eventqueue"
 
-  private[spark] val SPARK_RESOURCES_COORDINATE =
-    ConfigBuilder("spark.resources.coordinateResourcesInStandalone")
-      .doc("Whether to coordinate resources automatically among 
workers/drivers(client only) " +
-        "in Standalone. If false, the user is responsible for configuring 
different resources " +
-        "for workers/drivers that run on the same host.")
-      .booleanConf
-      .createWithDefault(true)
-
-  private[spark] val SPARK_RESOURCES_DIR =
-    ConfigBuilder("spark.resources.dir")
-      .doc("Directory used to coordinate resources among 
workers/drivers(client only) in " +
-        "Standalone. Default is SPARK_HOME. Make sure to use the same 
directory for worker " +
-        "and drivers in client mode that run on the same host. Don't clean up 
this directory " +
-        "while workers/drivers are still alive to avoid the most likely 
resources conflict. ")
-      .stringConf
-      .createOptional
-
   private[spark] val RESOURCES_DISCOVERY_PLUGIN =
     ConfigBuilder("spark.resources.discoveryPlugin")
       .doc("Comma-separated list of class names implementing" +
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index dde4323..9f332ba 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2547,28 +2547,6 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Given a process id, return true if the process is still running.
-   */
-  def isProcessRunning(pid: Int): Boolean = {
-    val process = executeCommand(Seq("kill", "-0", pid.toString))
-    process.waitFor(10, TimeUnit.SECONDS)
-    process.exitValue() == 0
-  }
-
-  /**
-   * Returns the pid of this JVM process.
-   */
-  def getProcessId: Int = {
-    val PROCESS = "(\\d+)@(.*)".r
-    val name = getProcessName()
-    name match {
-      case PROCESS(pid, _) => pid.toInt
-      case _ =>
-        throw new SparkException(s"Unexpected process name: $name, expected to 
be PID@hostname.")
-    }
-  }
-
-  /**
    * Returns the name of this JVM process. This is OS dependent but typically 
(OSX, Linux, Windows),
    * this is formatted as PID@hostname.
    */
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index df9c7c5..9f8fa89 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -841,7 +841,6 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
         .setAppName("test-cluster")
         .set(DRIVER_GPU_ID.amountConf, "3")
         .set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath)
-        .set(SPARK_RESOURCES_DIR, dir.getName())
 
       sc = new SparkContext(conf)
 
@@ -924,7 +923,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
     assume(!(Utils.isWindows))
     withTempDir { dir =>
       val discoveryScript = createTempScriptWithExpectedOutput(dir, 
"resourceDiscoveryScript",
-        """{"name": "gpu","addresses":["0", "1", "2", "3", "4", "5", "6", "7", 
"8"]}""")
+        """{"name": "gpu","addresses":["0", "1", "2"]}""")
 
       val conf = new SparkConf()
         .setMaster("local-cluster[3, 1, 1024]")
@@ -933,7 +932,6 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
         .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
         .set(TASK_GPU_ID.amountConf, "3")
         .set(EXECUTOR_GPU_ID.amountConf, "3")
-        .set(SPARK_RESOURCES_DIR, dir.getName())
 
       sc = new SparkContext(conf)
 
@@ -945,7 +943,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
         context.resources().get(GPU).get.addresses.iterator
       }
       val gpus = rdd.collect()
-      assert(gpus.sorted === Seq("0", "1", "2", "3", "4", "5", "6", "7", "8"))
+      assert(gpus.sorted === Seq("0", "0", "0", "1", "1", "1", "2", "2", "2"))
 
       eventually(timeout(10.seconds)) {
         assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum 
== 0)
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index bb541b4..2d3d0af 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -36,7 +36,6 @@ import org.apache.spark.{SecurityManager, SparkConf, 
SparkFunSuite}
 import org.apache.spark.TestUtils.{createTempJsonFile, 
createTempScriptWithExpectedOutput}
 import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
 import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, 
ExecutorStateChanged, WorkDirCleanup}
-import 
org.apache.spark.deploy.StandaloneResourceUtils.{ALLOCATED_RESOURCES_FILE, 
SPARK_RESOURCES_COORDINATE_DIR}
 import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Worker._
@@ -64,7 +63,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
   private def makeWorker(
       conf: SparkConf = new SparkConf(),
       shuffleServiceSupplier: Supplier[ExternalShuffleService] = null,
-      pid: Int = Utils.getProcessId,
       local: Boolean = false): Worker = {
     assert(_worker === null, "Some Worker's RpcEnv is leaked in tests")
     val securityMgr = new SecurityManager(conf)
@@ -72,7 +70,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
     val resourcesFile = conf.get(SPARK_WORKER_RESOURCE_FILE)
     val localWorker = new Worker(rpcEnv, 50000, 20, 1234 * 5,
       Array.fill(1)(RpcAddress("1.2.3.4", 1234)), "Worker", "/tmp",
-      conf, securityMgr, resourcesFile, shuffleServiceSupplier, pid)
+      conf, securityMgr, resourcesFile, shuffleServiceSupplier)
     if (local) {
       localWorker
     } else {
@@ -81,14 +79,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
     }
   }
 
-  private def assertResourcesFileDeleted(): Unit = {
-    assert(sys.props.contains("spark.test.home"))
-    val sparkHome = sys.props.get("spark.test.home")
-    val resourceFile = new File(sparkHome + "/" + 
SPARK_RESOURCES_COORDINATE_DIR,
-      ALLOCATED_RESOURCES_FILE)
-    assert(!resourceFile.exists())
-  }
-
   before {
     MockitoAnnotations.initMocks(this)
   }
@@ -251,7 +241,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
       worker.rpcEnv.shutdown()
       worker.rpcEnv.awaitTermination()
     }
-    assertResourcesFileDeleted()
   }
 
   test("worker could load resources from resources file while launching") {
@@ -273,7 +262,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
         worker.rpcEnv.shutdown()
         worker.rpcEnv.awaitTermination()
       }
-      assertResourcesFileDeleted()
     }
   }
 
@@ -292,7 +280,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
         worker.rpcEnv.shutdown()
         worker.rpcEnv.awaitTermination()
       }
-      assertResourcesFileDeleted()
     }
   }
 
@@ -316,65 +303,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
         worker.rpcEnv.shutdown()
         worker.rpcEnv.awaitTermination()
       }
-      assertResourcesFileDeleted()
-    }
-  }
-
-  test("Workers run on the same host should avoid resources conflict when 
coordinate is on") {
-    val conf = new SparkConf()
-    withTempDir { dir =>
-      val scriptPath = createTempScriptWithExpectedOutput(dir, 
"fpgaDiscoverScript",
-        """{"name": "fpga","addresses":["f1", "f2", "f3", "f4", "f5"]}""")
-      conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath)
-      conf.set(WORKER_FPGA_ID.amountConf, "2")
-      val workers = (0 until 3).map(id => makeWorker(conf, pid = id, local = 
true))
-      workers.zipWithIndex.foreach{case (w, i) => 
w.rpcEnv.setupEndpoint(s"worker$i", w)}
-      eventually(timeout(20.seconds)) {
-        val (empty, nonEmpty) = workers.partition(_.resources.isEmpty)
-        assert(empty.length === 1)
-        assert(nonEmpty.length === 2)
-        val totalResources = 
nonEmpty.flatMap(_.resources(FPGA).addresses).toSet.toSeq.sorted
-        assert(totalResources === Seq("f1", "f2", "f3", "f4"))
-        workers.foreach(_.rpcEnv.shutdown())
-        workers.foreach(_.rpcEnv.awaitTermination())
-      }
-      assertResourcesFileDeleted()
-    }
-  }
-
-  test("Workers run on the same host should load resources naively when 
coordinate is off") {
-    val conf = new SparkConf()
-    // disable coordination
-    conf.set(config.SPARK_RESOURCES_COORDINATE, false)
-    withTempDir { dir =>
-      val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("g0", "g1"))
-      val ja = Extraction.decompose(Seq(gpuArgs))
-      val resourcesPath = createTempJsonFile(dir, "resources", ja)
-      val scriptPath = createTempScriptWithExpectedOutput(dir, 
"fpgaDiscoverScript",
-        """{"name": "fpga","addresses":["f1", "f2", "f3", "f4", "f5"]}""")
-      conf.set(SPARK_WORKER_RESOURCE_FILE.key, resourcesPath)
-      conf.set(WORKER_GPU_ID.amountConf, "2")
-      conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath)
-      conf.set(WORKER_FPGA_ID.amountConf, "2")
-      val workers = (0 until 3).map(id => makeWorker(conf, pid = id, local = 
true))
-      workers.zipWithIndex.foreach{case (w, i) => 
w.rpcEnv.setupEndpoint(s"worker$i", w)}
-      eventually(timeout(20.seconds)) {
-        val (empty, nonEmpty) = workers.partition(_.resources.isEmpty)
-        assert(empty.length === 0)
-        assert(nonEmpty.length === 3)
-        // Each Worker should get the same resources from resources file and 
discovery script
-        // without coordination. Note that, normally, we must config different 
resources
-        // for workers run on the same host when coordinate config is off. 
Test here is used
-        // to validate the different behaviour comparing to the above test 
when coordinate config
-        // is on, so we admit the resources collision here.
-        nonEmpty.foreach { worker =>
-          assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation,
-            FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3", 
"f4", "f5"))))
-        }
-        workers.foreach(_.rpcEnv.shutdown())
-        workers.foreach(_.rpcEnv.awaitTermination())
-      }
-      assertResourcesFileDeleted()
     }
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala
 
b/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala
index 437c903..ff7d680 100644
--- 
a/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala
@@ -50,7 +50,6 @@ class ResourceDiscoveryPluginSuite extends SparkFunSuite with 
LocalSparkContext
         .set(WORKER_GPU_ID.amountConf, "2")
         .set(TASK_GPU_ID.amountConf, "1")
         .set(EXECUTOR_GPU_ID.amountConf, "1")
-        .set(SPARK_RESOURCES_DIR, dir.getName())
         .set(WORKER_FPGA_ID.amountConf, "2")
         .set(TASK_FPGA_ID.amountConf, "1")
         .set(EXECUTOR_FPGA_ID.amountConf, "1")
@@ -81,7 +80,6 @@ class ResourceDiscoveryPluginSuite extends SparkFunSuite with 
LocalSparkContext
         .set(WORKER_GPU_ID.amountConf, "2")
         .set(TASK_GPU_ID.amountConf, "1")
         .set(EXECUTOR_GPU_ID.amountConf, "1")
-        .set(SPARK_RESOURCES_DIR, dir.getName())
 
       sc = new SparkContext(conf)
       TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
@@ -108,7 +106,6 @@ class ResourceDiscoveryPluginSuite extends SparkFunSuite 
with LocalSparkContext
         .set(WORKER_GPU_ID.amountConf, "2")
         .set(TASK_GPU_ID.amountConf, "1")
         .set(EXECUTOR_GPU_ID.amountConf, "1")
-        .set(SPARK_RESOURCES_DIR, dir.getName())
 
       sc = new SparkContext(conf)
       TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
@@ -134,7 +131,6 @@ class ResourceDiscoveryPluginSuite extends SparkFunSuite 
with LocalSparkContext
         .set(RESOURCES_DISCOVERY_PLUGIN, 
Seq(classOf[TestResourceDiscoveryPluginEmpty].getName()))
         .set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath)
         .set(DRIVER_GPU_ID.amountConf, "2")
-        .set(SPARK_RESOURCES_DIR, dir.getName())
 
       sc = new SparkContext(conf)
       TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
diff --git a/docs/configuration.md b/docs/configuration.md
index 469feed..5e6fe93 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -194,25 +194,6 @@ of the most common options to set are:
   </td>
 </tr>
 <tr>
- <td><code>spark.resources.coordinateResourcesInStandalone</code></td>
-  <td>true</td>
-  <td>
-    Whether to coordinate resources automatically among workers/drivers(client 
only) 
-    in Standalone. If false, the user is responsible for configuring different 
resources
-    for workers/drivers that run on the same host.
-  </td>
-</tr>
-<tr>
- <td><code>spark.resources.dir</code></td>
-  <td>SPARK_HOME</td>
-  <td>
-    Directory used to coordinate resources among workers/drivers(client only) 
in Standalone.
-    Default is SPARK_HOME. Make sure to use the same directory for worker and 
drivers in
-    client mode that run on the same host. Don't clean up this directory while 
workers/drivers
-    are still alive to avoid the most likely resources conflict. 
-  </td>
-</tr>
-<tr>
  <td><code>spark.driver.resource.{resourceName}.amount</code></td>
   <td>0</td>
   <td>
@@ -228,9 +209,8 @@ of the most common options to set are:
   <td>
     A script for the driver to run to discover a particular resource type. 
This should
     write to STDOUT a JSON string in the format of the ResourceInformation 
class. This has a
-    name and an array of addresses. For a client-submitted driver in 
Standalone, discovery
-    script must assign different resource addresses to this driver comparing 
to workers' and
-    other drivers' when 
<code>spark.resources.coordinateResourcesInStandalone</code> is off.
+    name and an array of addresses. For a client-submitted driver, discovery 
script must assign
+    different resource addresses to this driver comparing to other drivers on 
the same host.
   </td>
 </tr>
 <tr>
@@ -2755,5 +2735,4 @@ There are configurations available to request resources 
for the driver: <code>sp
 
 Spark will use the configurations specified to first request containers with 
the corresponding resources from the cluster manager. Once it gets the 
container, Spark launches an Executor in that container which will discover 
what resources the container has and the addresses associated with each 
resource. The Executor will register with the Driver and report back the 
resources available to that Executor. The Spark scheduler can then schedule 
tasks to each Executor and assign specific reso [...]
 
-See your cluster manager specific page for requirements and details on each of 
- [YARN](running-on-yarn.html#resource-allocation-and-configuration-overview), 
[Kubernetes](running-on-kubernetes.html#resource-allocation-and-configuration-overview)
 and [Standalone 
Mode](spark-standalone.html#resource-allocation-and-configuration-overview). It 
is currently not available with Mesos or local mode. If using local-cluster 
mode see the Spark Standalone documentation but be aware only a single wor [...]
-
+See your cluster manager specific page for requirements and details on each of 
- [YARN](running-on-yarn.html#resource-allocation-and-configuration-overview), 
[Kubernetes](running-on-kubernetes.html#resource-allocation-and-configuration-overview)
 and [Standalone 
Mode](spark-standalone.html#resource-allocation-and-configuration-overview). It 
is currently not available with Mesos or local mode. And please also note that 
local-cluster mode with multiple workers is not supported(see Standalon [...]
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index ff310ed..17b6772 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -256,8 +256,6 @@ SPARK_MASTER_OPTS supports the following system properties:
   <td>
     Path to resource discovery script, which is used to find a particular 
resource while worker starting up.
     And the output of the script should be formatted like the 
<code>ResourceInformation</code> class.
-    When <code>spark.resources.coordinateResourcesInStandalone</code> is off, 
the discovery script must assign different
-    resources for workers and drivers in client mode that run on the same host 
to avoid resource conflict.
   </td>
 </tr>
 <tr>
@@ -266,9 +264,7 @@ SPARK_MASTER_OPTS supports the following system properties:
   <td>
     Path to resources file which is used to find various resources while 
worker starting up.
     The content of resources file should be formatted like <code>
-    [[{"id":{"componentName": 
"spark.worker","resourceName":"gpu"},"addresses":["0","1","2"]}]]</code>.
-    When <code>spark.resources.coordinateResourcesInStandalone</code> is off, 
resources file must assign different
-    resources for workers and drivers in client mode that run on the same host 
to avoid resource conflict.
+    [{"id":{"componentName": 
"spark.worker","resourceName":"gpu"},"addresses":["0","1","2"]}]</code>.
     If a particular resource is not found in the resources file, the discovery 
script would be used to
     find that resource. If the discovery script also does not find the 
resources, the worker will fail
     to start up.
@@ -346,9 +342,9 @@ Please make sure to have read the Custom Resource 
Scheduling and Configuration O
 
 Spark Standalone has 2 parts, the first is configuring the resources for the 
Worker, the second is the resource allocation for a specific application.
 
-The user must configure the Workers to have a set of resources available so 
that it can assign them out to Executors. The 
<code>spark.worker.resource.{resourceName}.amount</code> is used to control the 
amount of each resource the worker has allocated. The user must also specify 
either <code>spark.worker.resourcesFile</code> or 
<code>spark.worker.resource.{resourceName}.discoveryScript</code> to specify 
how the Worker discovers the resources its assigned. See the descriptions above 
for ea [...]
+The user must configure the Workers to have a set of resources available so 
that it can assign them out to Executors. The 
<code>spark.worker.resource.{resourceName}.amount</code> is used to control the 
amount of each resource the worker has allocated. The user must also specify 
either <code>spark.worker.resourcesFile</code> or 
<code>spark.worker.resource.{resourceName}.discoveryScript</code> to specify 
how the Worker discovers the resources its assigned. See the descriptions above 
for ea [...]
 
-The second part is running an application on Spark Standalone. The only 
special case from the standard Spark resource configs is when you are running 
the Driver in client mode. For a Driver in client mode, the user can specify 
the resources it uses via <code>spark.driver.resourcesfile</code> or 
<code>spark.driver.resources.{resourceName}.discoveryScript</code>. If the 
Driver is running on the same host as other Drivers or Workers there are 2 ways 
to make sure the they don't use the same  [...]
+The second part is running an application on Spark Standalone. The only 
special case from the standard Spark resource configs is when you are running 
the Driver in client mode. For a Driver in client mode, the user can specify 
the resources it uses via <code>spark.driver.resourcesfile</code> or 
<code>spark.driver.resource.{resourceName}.discoveryScript</code>. If the 
Driver is running on the same host as other Drivers, please make sure the 
resources file or discovery script only returns  [...]
 
 Note, the user does not need to specify a discovery script when submitting an 
application as the Worker will start each Executor with the resources it 
allocates to it.
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to