This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 96b301ce4e41 [SPARK-47498][TESTS][CORE] Refine some GPU fraction
calculation tests
96b301ce4e41 is described below
commit 96b301ce4e414b575352c431c31412310c7f168b
Author: Bobby Wang <[email protected]>
AuthorDate: Wed Mar 27 11:47:51 2024 +0800
[SPARK-47498][TESTS][CORE] Refine some GPU fraction calculation tests
### What changes were proposed in this pull request?
This PR refines some fractional GPU resource calculation tests.
### Why are the changes needed?
This PR adds more comments to the tests and refines some assertation blocks.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
The existing CI passes.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45631 from wbo4958/refine-tests.
Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Yi Wu <[email protected]>
---
.../scheduler/ExecutorResourcesAmountsSuite.scala | 24 +--
.../spark/scheduler/TaskSchedulerImplSuite.scala | 210 +++++++++++----------
2 files changed, 121 insertions(+), 113 deletions(-)
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala
index 75a772dcdec8..1c5cb041ad6c 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala
@@ -53,7 +53,7 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite
with ExecutorResourceU
// assign nothing to rp without resource profile
val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
assert(assigned.isDefined)
- assigned.foreach { case resource => assert(resource.isEmpty) }
+ assigned.foreach(resource => assert(resource.isEmpty))
}
test("Convert ExecutorResourceInfos to ExecutorResourcesAmounts") {
@@ -187,8 +187,8 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite
with ExecutorResourceU
val rp = new ResourceProfileBuilder().require(treqs).build()
var assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
- assert(!assigned.isEmpty)
- assigned.foreach { case resource => assert(!resource.isEmpty)}
+ assert(assigned.isDefined)
+ assigned.foreach(resource => assert(resource.nonEmpty))
val treqs1 = new TaskResourceRequests()
.resource("gpu", gpuTaskAmount)
@@ -270,8 +270,8 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite
with ExecutorResourceU
// taskMount = 0.1 < 1.0 which can be assigned.
val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
- assert(!assigned.isEmpty)
- assigned.foreach { case resource =>
+ assert(assigned.isDefined)
+ assigned.foreach { resource =>
assert(resource.size === 1)
assert(resource.keys.toSeq === Seq("gpu"))
assert(resource("gpu").size === 1)
@@ -337,8 +337,8 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite
with ExecutorResourceU
// taskMount = 0.1 < 1.0 which can be assigned.
val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
- assert(!assigned.isEmpty)
- assigned.foreach { case resourceAmounts =>
+ assert(assigned.isDefined)
+ assigned.foreach { resourceAmounts =>
assert(resourceAmounts.size === 2)
assert(resourceAmounts.keys.toSeq.sorted === Seq("gpu", "fpga").sorted)
@@ -402,17 +402,17 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite
with ExecutorResourceU
val treqs = new TaskResourceRequests().resource("gpu", taskAmount)
val rp = new ResourceProfileBuilder().require(treqs).build()
val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
- assert(!assigned.isEmpty)
- assigned.foreach { case resources =>
+ assert(assigned.isDefined)
+ assigned.foreach { resources =>
assert(
-
resources("gpu").values.toArray.sorted.map(ResourceAmountUtils.toFractionalResource(_))
+
resources("gpu").values.toArray.sorted.map(ResourceAmountUtils.toFractionalResource)
=== expectedAssignedAmount.sorted)
availableExecResAmounts.acquire(resources)
val leftRes = availableExecResAmounts.availableResources
assert(leftRes.size == 1)
- assert(leftRes.keys.toSeq(0) == "gpu")
+ assert(leftRes.keys.toSeq.head == "gpu")
assert(compareMaps(leftRes("gpu"), expectedLeftRes))
}
}
@@ -424,7 +424,7 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite
with ExecutorResourceU
val leftRes = availableExecResAmounts.availableResources
assert(leftRes.size == 1)
- assert(leftRes.keys.toSeq(0) == "gpu")
+ assert(leftRes.keys.toSeq.head == "gpu")
assert(compareMaps(leftRes("gpu"), expectedLeftRes))
}
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index d9db5f656176..5cc97410bcce 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -2302,13 +2302,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
taskDescriptions = taskDescriptions.sortBy(t => t.index)
assert(4 * taskNum === taskDescriptions.length)
assert(!failedTaskSet)
- var gpuAddress = -1
- for (taskId <- 0 until 4 * taskNum) {
- if (taskId % taskNum == 0) {
- gpuAddress += 1
- }
- assert(ArrayBuffer(gpuAddress.toString) ===
-
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted)
+ for (task <- taskDescriptions) {
+ // Spark offers the GPU resources in an ascending order starting
from the lowest
+ // GPU addresses and turns around to the next address until the
current one is exhausted.
+ assert(task.resources(GPU).keys.toArray === Array((task.index /
taskNum).toString))
}
}
}
@@ -2331,12 +2328,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
config.EXECUTOR_CORES.key -> executorCpus.toString)
- val taskSet = if (barrierMode) {
- FakeTask.createBarrierTaskSet(4 * taskNum)
- } else {
- FakeTask.createTaskSet(100)
- }
-
val workerOffers =
IndexedSeq(
WorkerOffer("executor0", "host0", executorCpus, Some("host0"),
@@ -2348,22 +2339,24 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
WorkerOffer("executor3", "host3", executorCpus, Some("host3"),
new ExecutorResourcesAmounts(Map(GPU ->
toInternalResource(Map("3" -> 1.0))))))
+ val executorNumber = workerOffers.length
+
+ val taskSet = if (barrierMode) {
+ FakeTask.createBarrierTaskSet(executorNumber * taskNum)
+ } else {
+ FakeTask.createTaskSet(100)
+ }
+
taskScheduler.submitTasks(taskSet)
// Launch tasks on executor that satisfies resource requirements
-
val taskDescriptions =
taskScheduler.resourceOffers(workerOffers).flatten
- assert(4 * taskNum === taskDescriptions.length)
+ assert(executorNumber * taskNum === taskDescriptions.length)
assert(!failedTaskSet)
- val assignedGpus: HashMap[String, Int] = HashMap.empty
- for (taskId <- 0 until 4 * taskNum) {
- val gpus =
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted
- assert(gpus.length == 1)
- val addr = gpus(0)
- if (!assignedGpus.contains(addr)) {
- assignedGpus(addr) = 1
- } else {
- assignedGpus(addr) += 1
- }
+ val assignedGpus: mutable.HashMap[String, Int] = mutable.HashMap.empty
+ taskDescriptions.foreach { task =>
+ val addresses = task.resources(GPU).keys.toArray
+ assert(addresses.length == 1)
+ assignedGpus.update(addresses(0),
assignedGpus.getOrElseUpdate(addresses(0), 0) + 1)
}
assert(assignedGpus.toMap ===
Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum))
@@ -2408,13 +2401,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
taskDescriptions = taskDescriptions.sortBy(t => t.index)
assert(4 * taskNum === taskDescriptions.length)
assert(!failedTaskSet)
- var gpuAddress = -1
- for (taskId <- 0 until 4 * taskNum) {
- if (taskId % taskNum == 0) {
- gpuAddress += 1
- }
- assert(ArrayBuffer(gpuAddress.toString) ===
-
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted)
+ for (task <- taskDescriptions) {
+ // Spark offers the GPU resources in an ascending order starting
from the lowest
+ // GPU addresses and turns around to the next address until the
current one is exhausted.
+ assert(task.resources(GPU).keys.toArray === Array((task.index /
taskNum).toString))
}
}
}
@@ -2439,12 +2429,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
val rp = new TaskResourceProfile(treqs.requests)
taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
- val taskSet = if (barrierMode) {
- FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id)
- } else {
- FakeTask.createTaskSet(100, 0, 1, 1, rp.id)
- }
-
val workerOffers =
IndexedSeq(
WorkerOffer("executor0", "host0", executorCpus, Some("host1"),
@@ -2461,22 +2445,26 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
rp.id)
)
+ val executorNumber = workerOffers.length
+
+ val taskSet = if (barrierMode) {
+ FakeTask.createBarrierTaskSet(executorNumber * taskNum, 0, 1, 1,
rp.id)
+ } else {
+ FakeTask.createTaskSet(100, 0, 1, 1, rp.id)
+ }
+
taskScheduler.submitTasks(taskSet)
// Launch tasks on executor that satisfies resource requirements
val taskDescriptions =
taskScheduler.resourceOffers(workerOffers).flatten
- assert(4 * taskNum === taskDescriptions.length)
+ assert(executorNumber * taskNum === taskDescriptions.length)
assert(!failedTaskSet)
- val assignedGpus: HashMap[String, Int] = HashMap.empty
- for (taskId <- 0 until 4 * taskNum) {
- val gpus =
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted
- assert(gpus.length == 1)
- val addr = gpus(0)
- if (!assignedGpus.contains(addr)) {
- assignedGpus(addr) = 1
- } else {
- assignedGpus(addr) += 1
- }
+
+ val assignedGpus: mutable.HashMap[String, Int] = mutable.HashMap.empty
+ taskDescriptions.foreach { task =>
+ val addresses = task.resources(GPU).keys.toArray
+ assert(addresses.length == 1)
+ assignedGpus.update(addresses(0),
assignedGpus.getOrElseUpdate(addresses(0), 0) + 1)
}
assert(assignedGpus.toMap ===
Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum))
@@ -2487,22 +2475,23 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1
executor " +
"can assign to other taskset") {
val taskCpus = 1
- val taskGpus = 0.3
+ val lowerGpuTaskAmount = 0.3
val executorGpus = 4
- val executorCpus = 50
+ val executorCpus = 20
// each tasks require 0.3 gpu
val taskScheduler = setupScheduler(numCores = executorCpus,
config.CPUS_PER_TASK.key -> taskCpus.toString,
- TASK_GPU_ID.amountConf -> taskGpus.toString,
+ TASK_GPU_ID.amountConf -> lowerGpuTaskAmount.toString,
EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
config.EXECUTOR_CORES.key -> executorCpus.toString
)
- val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1,
- ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ val lowerTaskSet = FakeTask.createTaskSet(100, stageId = 1, stageAttemptId
= 0,
+ priority = 1, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
// each task require 0.7 gpu
- val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7)
+ val higherGpuTaskAmount = 0.7
+ val treqs = new TaskResourceRequests().cpus(1).resource(GPU,
higherGpuTaskAmount)
val rp = new TaskResourceProfile(treqs.requests)
taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
@@ -2519,22 +2508,26 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
taskScheduler.submitTasks(lowerTaskSet)
taskScheduler.submitTasks(higherRpTaskSet)
- // should have 3 for default profile and 2 for additional resource profile
+ // Initially, offer the available resources to the higher priority task
set, where each task
+ // requires 0.7 GPU, so Spark can assign the GPU resources to a
maximum of 4 tasks, leaving
+ // 0.3 GPU available for each remaining tasks.
+ // Secondly, try to offer the available resources to the lower priority
task set, where each
+ // task requires 0.3 GPU, so the left available GPU resources can be
offered to a maximum
+ // of 4 lower priority tasks.
val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
assert(8 === taskDescriptions.length)
var index = 0
for (tDesc <- taskDescriptions) {
assert(tDesc.resources.contains(GPU))
- val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted
+ val addresses = tDesc.resources(GPU).keys.toArray.sorted
assert(addresses.length == 1)
+ assert(addresses(0) == tDesc.index.toString)
if (index < 4) { // the first 4 tasks will grab 0.7 gpu
- assert(addresses(0) == index.toString)
assert(ResourceAmountUtils.toFractionalResource(
- tDesc.resources.get(GPU).get(index.toString)) == 0.7)
+ tDesc.resources(GPU)(tDesc.index.toString)) == higherGpuTaskAmount)
} else {
- assert(addresses(0) == (index - 4).toString)
assert(ResourceAmountUtils.toFractionalResource(
- tDesc.resources.get(GPU).get((index - 4).toString)) == 0.3)
+ tDesc.resources(GPU)(tDesc.index.toString)) == lowerGpuTaskAmount)
}
index += 1
}
@@ -2543,14 +2536,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
test("SPARK-45527 TaskResourceProfile: the left gpu resources on multiple
executors " +
"can assign to other taskset") {
val taskCpus = 1
- val taskGpus = 0.3
+ val lowerTaskGpuAmount = 0.3
val executorGpus = 4
- val executorCpus = 50
+ val executorCpus = 20
// each tasks require 0.3 gpu
val taskScheduler = setupScheduler(numCores = executorCpus,
config.CPUS_PER_TASK.key -> taskCpus.toString,
- TASK_GPU_ID.amountConf -> taskGpus.toString,
+ TASK_GPU_ID.amountConf -> lowerTaskGpuAmount.toString,
EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
config.EXECUTOR_CORES.key -> executorCpus.toString
)
@@ -2558,7 +2551,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
// each task require 0.7 gpu
- val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7)
+ val higherTaskGpuAmount = 0.7
+ val treqs = new TaskResourceRequests().cpus(1).resource(GPU,
higherTaskGpuAmount)
val rp = new TaskResourceProfile(treqs.requests)
taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
@@ -2568,21 +2562,36 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
val workerOffers =
IndexedSeq(
// cpu won't be a problem
- WorkerOffer("executor0", "host0", 50, None, new
ExecutorResourcesAmounts(
+ WorkerOffer("executor0", "host0", executorCpus, None, new
ExecutorResourcesAmounts(
Map(GPU -> toInternalResource(Map("0" -> 1.0))))),
- WorkerOffer("executor1", "host1", 50, None, new
ExecutorResourcesAmounts(
+ WorkerOffer("executor1", "host1", executorCpus, None, new
ExecutorResourcesAmounts(
Map(GPU -> toInternalResource(Map("1" -> 1.0))))),
- WorkerOffer("executor2", "host2", 50, None, new
ExecutorResourcesAmounts(
+ WorkerOffer("executor2", "host2", executorCpus, None, new
ExecutorResourcesAmounts(
Map(GPU -> toInternalResource(Map("2" -> 1.0))))),
- WorkerOffer("executor3", "host3", 50, None, new
ExecutorResourcesAmounts(
+ WorkerOffer("executor3", "host3", executorCpus, None, new
ExecutorResourcesAmounts(
Map(GPU -> toInternalResource(Map("3" -> 1.0)))))
)
taskScheduler.submitTasks(lowerTaskSet)
taskScheduler.submitTasks(higherRpTaskSet)
- // should have 3 for default profile and 2 for additional resource profile
- val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+ // Initially, offer the available resources to the higher priority task
set, where each task
+ // requires 0.7 GPU, so Spark can assign the GPU resources to a
maximum of 4 tasks, leaving
+ // 0.3 GPU available for each remaining tasks.
+ // Secondly, try to offer the available resources to the lower priority
task set, where each
+ // task requires 0.3 GPU, so the left available GPU resources can be
offered to a maximum
+ // of 4 lower priority tasks.
+ var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+ // The original taskDescriptions is like below layout
+ // Executor 0, executor 1, executor 2, executor 3
+ // index 0, index 0, index 1, index 1 index 2, index 2, index 3,
index 3
+ // task_0.7, task_03 task_0.7, task_03 task_0.7, task_03 task_0.7,
task_03
+ // After sorting, it should be
+ // Executor 0, executor 1, executor 2, executor 3
+ // index 0 index 1 index 2 index 3
+ // task_0.7, task_0.7 task_0.7, task_0.7
+ // task_03, task_03, task_03, task_03
+ taskDescriptions = taskDescriptions.sortBy(_.taskId)
assert(8 === taskDescriptions.length)
var index = 0
@@ -2591,32 +2600,27 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
for (tDesc <- taskDescriptions) {
assert(tDesc.resources.contains(GPU))
- val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted
+ val addresses = tDesc.resources(GPU).keys.toArray.sorted
assert(addresses.length == 1)
val address = addresses(0)
- // Executor 0, executor 1, executor 2, executor 3
- // task_0.7, task_03 task_0.7, task_03 task_0.7, task_03 task_0.7,
task_03
- if (index % 2 == 0) {
- higherAssignedExecutorsGpus.append(
- (tDesc.executorId, address))
- assert(ResourceAmountUtils.toFractionalResource(
- tDesc.resources.get(GPU).get(address)) == 0.7)
+
+ if (index / 4 == 0) {
+ higherAssignedExecutorsGpus.append((tDesc.executorId, address))
+
assert(ResourceAmountUtils.toFractionalResource(tDesc.resources(GPU)(address))
==
+ higherTaskGpuAmount)
} else {
- lowerAssignedExecutorsGpus.append(
- (tDesc.executorId, address))
- assert(ResourceAmountUtils.toFractionalResource(
- tDesc.resources.get(GPU).get(address)) == 0.3)
+ lowerAssignedExecutorsGpus.append((tDesc.executorId, address))
+
assert(ResourceAmountUtils.toFractionalResource(tDesc.resources(GPU)(address))
==
+ lowerTaskGpuAmount)
}
index += 1
}
- assert(higherAssignedExecutorsGpus.sorted sameElements
- ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"),
("executor3", "3")
- ))
- assert(lowerAssignedExecutorsGpus.sorted sameElements
- ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"),
("executor3", "3")
- ))
+ assert(higherAssignedExecutorsGpus.sorted ==
+ ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"),
("executor3", "3")))
+ assert(lowerAssignedExecutorsGpus.sorted ==
+ ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"),
("executor3", "3")))
}
test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1
executor " +
@@ -2626,7 +2630,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
val executorGpus = 4
val executorCpus = 4
- // each tasks require 0.3 gpu
+ // each tasks require 0.4 gpu
val taskScheduler = setupScheduler(numCores = executorCpus,
config.CPUS_PER_TASK.key -> taskCpus.toString,
TASK_GPU_ID.amountConf -> taskGpus.toString,
@@ -2636,8 +2640,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
- // each task require 0.7 gpu
- val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7)
+ // each task requires 0.7 gpu
+ val higherTaskGpuAmount = 0.7
+ val treqs = new TaskResourceRequests().cpus(1).resource(GPU,
higherTaskGpuAmount)
val rp = new TaskResourceProfile(treqs.requests)
taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
@@ -2654,18 +2659,21 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
taskScheduler.submitTasks(lowerTaskSet)
taskScheduler.submitTasks(higherRpTaskSet)
- // only offer the resources to the higher taskset
+ // Initially, offer the available resources to the higher priority task
set, where each task
+ // requires 0.7 GPU, so Spark can assign the GPU resources to a
maximum of 4 tasks, leaving
+ // 0.3 GPU available for each remaining tasks.
+ // Secondly, try to offer the available resources to the lower priority
task set, where each
+ // task requires 0.4 GPU, while only 0.3 gpu of each address is
available, so couldn't
+ // offer any resources for the lower tasks.
val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
assert(4 === taskDescriptions.length)
- var index = 0
for (tDesc <- taskDescriptions) {
assert(tDesc.resources.contains(GPU))
- val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted
+ val addresses = tDesc.resources(GPU).keys.toArray.sorted
assert(addresses.length == 1)
- assert(addresses(0) == index.toString)
+ assert(addresses(0) == tDesc.index.toString)
assert(ResourceAmountUtils.toFractionalResource(
- tDesc.resources.get(GPU).get(index.toString)) == 0.7)
- index += 1
+ tDesc.resources(GPU)(tDesc.index.toString)) == higherTaskGpuAmount)
}
}
@@ -2677,9 +2685,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
val numFreeCores = 3
val workerOffers = IndexedSeq(
- new WorkerOffer("executor0", "host0", numFreeCores,
Some("192.168.0.101:49625")),
- new WorkerOffer("executor1", "host1", numFreeCores,
Some("192.168.0.101:49627")),
- new WorkerOffer("executor2", "host2", numFreeCores,
Some("192.168.0.101:49629")))
+ WorkerOffer("executor0", "host0", numFreeCores,
Some("192.168.0.101:49625")),
+ WorkerOffer("executor1", "host1", numFreeCores,
Some("192.168.0.101:49627")),
+ WorkerOffer("executor2", "host2", numFreeCores,
Some("192.168.0.101:49629")))
val attempt1 = FakeTask.createBarrierTaskSet(3)
// submit attempt 1, offer some resources, all tasks get launched together
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]