This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d30284b  [SPARK-27760][CORE] Spark resources - change user resource 
config from .count to .amount
d30284b is described below

commit d30284b5a51dd784f663eb4eea37087b35a54d00
Author: Thomas Graves <tgra...@nvidia.com>
AuthorDate: Thu Jun 6 14:16:05 2019 -0500

    [SPARK-27760][CORE] Spark resources - change user resource config from 
.count to .amount
    
    ## What changes were proposed in this pull request?
    
    Change the resource config 
spark.{executor/driver}.resource.{resourceName}.count to .amount to allow 
future usage of containing both a count and a unit.  Right now we only support 
counts - # of gpus for instance, but in the future we may want to support units 
for things like memory - 25G. I think making the user only have to specify a 
single config .amount is better then making them specify 2 separate configs of 
a .count and then a .unit.  Change it now since its a user facing config.
    
    Amount also matches how the spark on yarn configs are setup.
    
    ## How was this patch tested?
    
    Unit tests and manually verified on yarn and local cluster mode
    
    Closes #24810 from tgravescs/SPARK-27760-amount.
    
    Authored-by: Thomas Graves <tgra...@nvidia.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../main/scala/org/apache/spark/SparkConf.scala    |  4 +--
 .../main/scala/org/apache/spark/SparkContext.scala | 12 ++++----
 .../main/scala/org/apache/spark/TestUtils.scala    |  2 +-
 .../executor/CoarseGrainedExecutorBackend.scala    |  2 +-
 .../org/apache/spark/internal/config/package.scala |  2 +-
 .../org/apache/spark/ResourceDiscovererSuite.scala |  2 +-
 .../scala/org/apache/spark/SparkConfSuite.scala    |  8 ++---
 .../scala/org/apache/spark/SparkContextSuite.scala | 24 +++++++--------
 .../CoarseGrainedExecutorBackendSuite.scala        | 26 ++++++++--------
 .../CoarseGrainedSchedulerBackendSuite.scala       |  2 +-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   |  4 +--
 docs/configuration.md                              | 14 ++++-----
 .../apache/spark/deploy/k8s/KubernetesUtils.scala  |  4 +--
 .../k8s/features/BasicDriverFeatureStepSuite.scala |  2 +-
 .../features/BasicExecutorFeatureStepSuite.scala   |  4 +--
 .../spark/deploy/yarn/ResourceRequestHelper.scala  |  8 ++---
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala    |  2 +-
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 36 ++++++++++++++++++----
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     |  4 +--
 19 files changed, 93 insertions(+), 69 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 227f4a5..e231a40 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -512,8 +512,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
    */
   private[spark] def getTaskResourceRequirements(): Map[String, Int] = {
     getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX)
-      .withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_SUFFIX)}
-      .map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_SUFFIX.length), 
v.toInt)}.toMap
+      .withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_AMOUNT_SUFFIX)}
+      .map { case (k, v) => (k.dropRight(SPARK_RESOURCE_AMOUNT_SUFFIX.length), 
v.toInt)}.toMap
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 66f8f41..c169842 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -391,7 +391,7 @@ class SparkContext(config: SparkConf) extends Logging {
     }
     // verify the resources we discovered are what the user requested
     val driverReqResourcesAndCounts =
-      SparkConf.getConfigsWithSuffix(allDriverResourceConfs, 
SPARK_RESOURCE_COUNT_SUFFIX).toMap
+      SparkConf.getConfigsWithSuffix(allDriverResourceConfs, 
SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
     
ResourceDiscoverer.checkActualResourcesMeetRequirements(driverReqResourcesAndCounts,
 _resources)
 
     
logInfo("===============================================================================")
@@ -2725,7 +2725,7 @@ object SparkContext extends Logging {
       // executor and resources required by each task.
       val taskResourcesAndCount = sc.conf.getTaskResourceRequirements()
       val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix(
-        SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_COUNT_SUFFIX).toMap
+        SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
       var numSlots = execCores / taskCores
       var limitingResourceName = "CPU"
       taskResourcesAndCount.foreach { case (rName, taskCount) =>
@@ -2733,17 +2733,17 @@ object SparkContext extends Logging {
         val execCount = executorResourcesAndCounts.getOrElse(rName,
           throw new SparkException(
             s"The executor resource config: " +
-              s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_COUNT_SUFFIX} " +
+              s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} " +
               "needs to be specified since a task requirement config: " +
-              s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_COUNT_SUFFIX} was specified")
+              s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} was specified")
         )
         // Make sure the executor resources are large enough to launch at 
least one task.
         if (execCount.toLong < taskCount.toLong) {
           throw new SparkException(
             s"The executor resource config: " +
-              s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_COUNT_SUFFIX} " +
+              s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} " +
               s"= $execCount has to be >= the task config: " +
-              s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_COUNT_SUFFIX} = $taskCount")
+              s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} = $taskCount")
         }
         // Compare and update the max slots each executor can provide.
         val resourceNumSlots = execCount.toInt / taskCount
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index d306eed..5d88612 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -319,7 +319,7 @@ private[spark] object TestUtils {
       conf: SparkConf,
       resourceName: String,
       resourceCount: Int): SparkConf = {
-    val key = 
s"${SPARK_TASK_RESOURCE_PREFIX}${resourceName}${SPARK_RESOURCE_COUNT_SUFFIX}"
+    val key = 
s"${SPARK_TASK_RESOURCE_PREFIX}${resourceName}${SPARK_RESOURCE_AMOUNT_SUFFIX}"
     conf.set(key, resourceCount.toString)
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 13bf8a9..bfb7976 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -106,7 +106,7 @@ private[spark] class CoarseGrainedExecutorBackend(
       }
       val execReqResourcesAndCounts =
         env.conf.getAllWithPrefixAndSuffix(SPARK_EXECUTOR_RESOURCE_PREFIX,
-          SPARK_RESOURCE_COUNT_SUFFIX).toMap
+          SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
 
       
ResourceDiscoverer.checkActualResourcesMeetRequirements(execReqResourcesAndCounts,
         actualExecResources)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 90826bb..8d4910f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -35,7 +35,7 @@ package object config {
   private[spark] val SPARK_EXECUTOR_RESOURCE_PREFIX = 
"spark.executor.resource."
   private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource."
 
-  private[spark] val SPARK_RESOURCE_COUNT_SUFFIX = ".count"
+  private[spark] val SPARK_RESOURCE_AMOUNT_SUFFIX = ".amount"
   private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = 
".discoveryScript"
   private[spark] val SPARK_RESOURCE_VENDOR_SUFFIX = ".vendor"
 
diff --git a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala 
b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
index 400c9ca..2272573 100644
--- a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
@@ -224,7 +224,7 @@ class ResourceDiscovererSuite extends SparkFunSuite
   test("gpu's specified but not discovery script") {
     val sparkconf = new SparkConf
     sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
-      SPARK_RESOURCE_COUNT_SUFFIX, "2")
+      SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
 
     val error = intercept[SparkException] {
       ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 6978f30..c1265ce 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -450,16 +450,16 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
 
   test("get task resource requirement from config") {
     val conf = new SparkConf()
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, 
"2")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, 
"1")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"2")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"1")
     var taskResourceRequirement = conf.getTaskResourceRequirements()
     assert(taskResourceRequirement.size == 2)
     assert(taskResourceRequirement(GPU) == 2)
     assert(taskResourceRequirement(FPGA) == 1)
 
-    conf.remove(SPARK_TASK_RESOURCE_PREFIX + FPGA + 
SPARK_RESOURCE_COUNT_SUFFIX)
+    conf.remove(SPARK_TASK_RESOURCE_PREFIX + FPGA + 
SPARK_RESOURCE_AMOUNT_SUFFIX)
     // Ignore invalid prefix
-    conf.set("spark.invalid.prefix" + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "1")
+    conf.set("spark.invalid.prefix" + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
     taskResourceRequirement = conf.getTaskResourceRequirements()
     assert(taskResourceRequirement.size == 1)
     assert(taskResourceRequirement.get(FPGA).isEmpty)
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 40ec1b2..d1a36d4 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -747,7 +747,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 
       val conf = new SparkConf()
         .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
-          SPARK_RESOURCE_COUNT_SUFFIX, "1")
+          SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
         .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
           SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
         .setMaster("local-cluster[1, 1, 1024]")
@@ -784,7 +784,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 
       val conf = new SparkConf()
         .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
-          SPARK_RESOURCE_COUNT_SUFFIX, "1")
+          SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
         .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
           SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
         .set(DRIVER_RESOURCES_FILE, resourcesFile)
@@ -806,7 +806,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
   test("Test parsing resources task configs with missing executor config") {
     val conf = new SparkConf()
       .set(SPARK_TASK_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_COUNT_SUFFIX, "1")
+        SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
       .setMaster("local-cluster[1, 1, 1024]")
       .setAppName("test-cluster")
 
@@ -814,17 +814,17 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       sc = new SparkContext(conf)
     }.getMessage()
 
-    assert(error.contains("The executor resource config: 
spark.executor.resource.gpu.count " +
-      "needs to be specified since a task requirement config: 
spark.task.resource.gpu.count " +
+    assert(error.contains("The executor resource config: 
spark.executor.resource.gpu.amount " +
+      "needs to be specified since a task requirement config: 
spark.task.resource.gpu.amount " +
       "was specified"))
   }
 
   test("Test parsing resources executor config < task requirements") {
     val conf = new SparkConf()
       .set(SPARK_TASK_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_COUNT_SUFFIX, "2")
+        SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
       .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_COUNT_SUFFIX, "1")
+        SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
       .setMaster("local-cluster[1, 1, 1024]")
       .setAppName("test-cluster")
 
@@ -833,14 +833,14 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
     }.getMessage()
 
     assert(error.contains("The executor resource config: " +
-      "spark.executor.resource.gpu.count = 1 has to be >= the task config: " +
-      "spark.task.resource.gpu.count = 2"))
+      "spark.executor.resource.gpu.amount = 1 has to be >= the task config: " +
+      "spark.task.resource.gpu.amount = 2"))
   }
 
   test("Parse resources executor config not the same multiple numbers of the 
task requirements") {
     val conf = new SparkConf()
-      .set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
-      .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, 
"4")
+      .set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"2")
+      .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "4")
       .setMaster("local-cluster[1, 1, 1024]")
       .setAppName("test-cluster")
 
@@ -873,7 +873,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       val discoveryScript = resourceFile.getPath()
 
       val conf = new SparkConf()
-        
.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_COUNT_SUFFIX}", 
"3")
+        
.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_AMOUNT_SUFFIX}", 
"3")
         
.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX}",
           discoveryScript)
         .setMaster("local-cluster[3, 3, 1024]")
diff --git 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 0b4ad0d..3c4d51f 100644
--- 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -57,7 +57,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
 
   test("parsing no resources") {
     val conf = new SparkConf
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, 
"2")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
 
@@ -79,8 +79,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
 
   test("parsing one resources") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_COUNT_SUFFIX, "2")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, 
"2")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
@@ -103,10 +103,10 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
 
   test("parsing multiple resources") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + 
SPARK_RESOURCE_COUNT_SUFFIX, "3")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, 
"3")
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_COUNT_SUFFIX, "2")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, 
"2")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "3")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"3")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
@@ -136,8 +136,8 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
 
   test("error checking parsing resources and executor and task configs") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_COUNT_SUFFIX, "2")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, 
"2")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
@@ -178,8 +178,8 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
 
   test("executor resource found less than required") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_COUNT_SUFFIX, "4")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, 
"1")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "4")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"1")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
@@ -204,8 +204,8 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
 
   test("use discoverer") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + 
SPARK_RESOURCE_COUNT_SUFFIX, "3")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, 
"3")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "3")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"3")
     assume(!(Utils.isWindows))
     withTempDir { dir =>
       val fpgaDiscovery = new File(dir, "resourceDiscoverScriptfpga")
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 6b3916b..70d368e 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -187,7 +187,7 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
 
     val conf = new SparkConf()
       .set(EXECUTOR_CORES, 3)
-      .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, 
"3")
+      .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "3")
       .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive 
during test
       .setMaster(
       
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 456996c..9a4c7a9 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -1251,9 +1251,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     val executorCpus = 4
     val taskScheduler = setupScheduler(numCores = executorCpus,
       config.CPUS_PER_TASK.key -> taskCpus.toString,
-      
s"${config.SPARK_TASK_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_COUNT_SUFFIX}"
 ->
+      
s"${config.SPARK_TASK_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_AMOUNT_SUFFIX}"
 ->
         taskGpus.toString,
-      
s"${config.SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_COUNT_SUFFIX}"
 ->
+      
s"${config.SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_AMOUNT_SUFFIX}"
 ->
         executorGpus.toString,
       config.EXECUTOR_CORES.key -> executorCpus.toString)
     val taskSet = FakeTask.createTaskSet(3)
diff --git a/docs/configuration.md b/docs/configuration.md
index 6632dea..7b7d6cc 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -194,10 +194,10 @@ of the most common options to set are:
   </td>
 </tr>
 <tr>
- <td><code>spark.driver.resource.{resourceName}.count</code></td>
+ <td><code>spark.driver.resource.{resourceName}.amount</code></td>
   <td>0</td>
   <td>
-    The number of a particular resource type to use on the driver.
+    Amount of a particular resource type to use on the driver.
     If this is used, you must also specify the
     <code>spark.driver.resource.{resourceName}.discoveryScript</code>
     for the driver to find the resource on startup.
@@ -264,10 +264,10 @@ of the most common options to set are:
   </td>
 </tr>
 <tr>
- <td><code>spark.executor.resource.{resourceName}.count</code></td>
+ <td><code>spark.executor.resource.{resourceName}.amount</code></td>
   <td>0</td>
   <td>
-    The number of a particular resource type to use per executor process.
+    Amount of a particular resource type to use per executor process.
     If this is used, you must also specify the
     <code>spark.executor.resource.{resourceName}.discoveryScript</code>
     for the executor to find the resource on startup.
@@ -1888,11 +1888,11 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.task.resource.{resourceName}.count</code></td>
+  <td><code>spark.task.resource.{resourceName}.amount</code></td>
   <td>1</td>
   <td>
-    Number of a particular resource type to allocate for each task. If this is 
specified
-    you must also provide the executor config 
<code>spark.executor.resource.{resourceName}.count</code>
+    Amount of a particular resource type to allocate for each task. If this is 
specified
+    you must also provide the executor config 
<code>spark.executor.resource.{resourceName}.amount</code>
     and any corresponding discovery configs so that your executors are created 
with that resource type.
   </td>
 </tr>
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 522c8f7..25b997f 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -32,7 +32,7 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{SPARK_RESOURCE_COUNT_SUFFIX, 
SPARK_RESOURCE_VENDOR_SUFFIX}
+import org.apache.spark.internal.config.{SPARK_RESOURCE_AMOUNT_SUFFIX, 
SPARK_RESOURCE_VENDOR_SUFFIX}
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.util.{Clock, SystemClock, Utils}
 import org.apache.spark.util.Utils.getHadoopFileSystem
@@ -228,7 +228,7 @@ private[spark] object KubernetesUtils extends Logging {
       sparkConf: SparkConf): Map[String, Quantity] = {
     val allResources = sparkConf.getAllWithPrefix(componentName)
     val vendors = SparkConf.getConfigsWithSuffix(allResources, 
SPARK_RESOURCE_VENDOR_SUFFIX).toMap
-    val amounts = SparkConf.getConfigsWithSuffix(allResources, 
SPARK_RESOURCE_COUNT_SUFFIX).toMap
+    val amounts = SparkConf.getConfigsWithSuffix(allResources, 
SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
     val uniqueResources = SparkConf.getBaseOfConfigs(allResources)
 
     uniqueResources.map { rName =>
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index f60c6fb..df326a2 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -56,7 +56,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS)
     resources.foreach { case (_, testRInfo) =>
       sparkConf.set(
-        
s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
+        
s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
         testRInfo.count)
       sparkConf.set(
         
s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 3e892a9..eb5532e 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -97,7 +97,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
     // test missing vendor
     gpuResources.foreach { case (_, testRInfo) =>
       baseConf.set(
-        
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
+        
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
         testRInfo.count)
     }
     val step = new BasicExecutorFeatureStep(newExecutorConf(), new 
SecurityManager(baseConf))
@@ -127,7 +127,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
       ("foo.com/fpga" -> TestResourceInformation("fpga", "f1", "foo.com")))
     gpuResources.foreach { case (_, testRInfo) =>
       baseConf.set(
-        
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
+        
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
         testRInfo.count)
       baseConf.set(
         
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
index 66e4781..7480dd8 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
@@ -68,13 +68,13 @@ private object ResourceRequestHelper extends Logging {
       (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
       (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
       (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
-      (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}",
+      (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}",
         s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"),
-      (s"${SPARK_DRIVER_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}",
+      (s"${SPARK_DRIVER_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}",
         s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"),
-      (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu{SPARK_RESOURCE_COUNT_SUFFIX}",
+      (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}",
         s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"),
-      (s"${SPARK_DRIVER_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_COUNT_SUFFIX}",
+      (s"${SPARK_DRIVER_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}",
         s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"))
 
     val errorMessage = new mutable.StringBuilder()
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 7c8ee03..0f11820 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -55,7 +55,7 @@ object YarnSparkHadoopUtil {
       ): Map[String, String] = {
     Map("gpu" -> YARN_GPU_RESOURCE_CONFIG, "fpga" -> 
YARN_FPGA_RESOURCE_CONFIG).map {
       case (rName, yarnName) =>
-        val resourceCountSparkConf = 
s"${confPrefix}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}"
+        val resourceCountSparkConf = 
s"${confPrefix}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}"
         (yarnName -> sparkConf.get(resourceCountSparkConf, "0"))
     }.filter { case (_, count) => count.toLong > 0 }
   }
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 2c2c237..884e0f5 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -390,7 +390,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
     }
   }
 
-  test(s"custom resource request yarn config and spark config fails") {
+  test("custom driver resource request yarn config and spark config fails") {
     assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
     val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", 
YARN_FPGA_RESOURCE_CONFIG -> "fpga")
     ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
@@ -400,7 +400,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
       conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
     }
     resources.values.foreach { rName =>
-      
conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
 "3")
+      
conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
 "3")
     }
 
     val error = intercept[SparkException] {
@@ -408,12 +408,36 @@ class ClientSuite extends SparkFunSuite with Matchers {
     }.getMessage()
 
     assert(error.contains("Do not use 
spark.yarn.driver.resource.yarn.io/fpga," +
-      " please use spark.driver.resource.fpga"))
+      " please use spark.driver.resource.fpga.amount"))
     assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu," 
+
-      " please use spark.driver.resource.gpu"))
+      " please use spark.driver.resource.gpu.amount"))
   }
 
-  test(s"custom resources spark config mapped to yarn config") {
+  test("custom executor resource request yarn config and spark config fails") {
+    assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+    val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", 
YARN_FPGA_RESOURCE_CONFIG -> "fpga")
+    ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
+
+    val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
+    resources.keys.foreach { yarnName =>
+      conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
+    }
+    resources.values.foreach { rName =>
+      
conf.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
 "3")
+    }
+
+    val error = intercept[SparkException] {
+      ResourceRequestHelper.validateResources(conf)
+    }.getMessage()
+
+    assert(error.contains("Do not use 
spark.yarn.executor.resource.yarn.io/fpga," +
+      " please use spark.executor.resource.fpga.amount"))
+    assert(error.contains("Do not use 
spark.yarn.executor.resource.yarn.io/gpu," +
+      " please use spark.executor.resource.gpu.amount"))
+  }
+
+
+  test("custom resources spark config mapped to yarn config") {
     assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
     val yarnMadeupResource = "yarn.io/madeup"
     val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu",
@@ -423,7 +447,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
 
     val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
     resources.values.foreach { rName =>
-      
conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
 "3")
+      
conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
 "3")
     }
     // also just set yarn one that we don't convert
     conf.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + yarnMadeupResource, "5")
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index b16464c..0040369 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -189,8 +189,8 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
 
     val sparkResources =
-      
Map(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_COUNT_SUFFIX}" -> 
"3",
-        s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}" 
-> "2",
+      
Map(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}" -> 
"3",
+        
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}" -> "2",
         s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}" -> "5")
     val handler = createAllocator(1, mockAmClient, sparkResources)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to