This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5bade8addd9 [SPARK-43202][YARN] Replace reflection w/ direct calling
for YARN Resource API
5bade8addd9 is described below
commit 5bade8addd91ee688b0363623210f24117e1583b
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Apr 20 13:01:34 2023 -0700
[SPARK-43202][YARN] Replace reflection w/ direct calling for YARN Resource
API
### What changes were proposed in this pull request?
Replace reflection w/ direct calling for YARN Resource API, including
- `org.apache.hadoop.yarn.api.records.ResourceInformation`,
- `org.apache.hadoop.yarn.exceptions.ResourceNotFoundException`
which were added in
[YARN-4081](https://issues.apache.org/jira/browse/YARN-4081) (Hadoop
2.10.0/3.0.0)
### Why are the changes needed?
Simplify code. Since
[SPARK-42452](https://issues.apache.org/jira/browse/SPARK-42452) removed
support for Hadoop 2, we can call those API directly now.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes #40860 from pan3793/SPARK-43197.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
---
.../spark/deploy/yarn/ResourceRequestHelper.scala | 97 ++++------------------
.../apache/spark/deploy/yarn/YarnAllocator.scala | 5 +-
.../org/apache/spark/deploy/yarn/ClientSuite.scala | 22 ++---
.../deploy/yarn/ResourceRequestHelperSuite.scala | 25 +++---
.../deploy/yarn/ResourceRequestTestHelper.scala | 71 ++--------------
.../spark/deploy/yarn/YarnAllocatorSuite.scala | 13 +--
6 files changed, 48 insertions(+), 185 deletions(-)
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
index 5a5334dc763..0dd4e0a6c8a 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
@@ -17,13 +17,11 @@
package org.apache.spark.deploy.yarn
-import java.lang.{Long => JLong}
-import java.lang.reflect.InvocationTargetException
-
import scala.collection.mutable
-import scala.util.Try
import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.api.records.ResourceInformation
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.yarn.config._
@@ -31,16 +29,10 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceID
import org.apache.spark.resource.ResourceUtils.{AMOUNT, FPGA, GPU}
-import org.apache.spark.util.{CausedBy, Utils}
+import org.apache.spark.util.CausedBy
-/**
- * This helper class uses some of Hadoop 3 methods from the YARN API,
- * so we need to use reflection to avoid compile error when building against
Hadoop 2.x
- */
private object ResourceRequestHelper extends Logging {
private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
- private val RESOURCE_INFO_CLASS =
"org.apache.hadoop.yarn.api.records.ResourceInformation"
- private val RESOURCE_NOT_FOUND =
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
@volatile private var numResourceErrors: Int = 0
private[yarn] def getYarnResourcesAndAmounts(
@@ -152,23 +144,6 @@ private object ResourceRequestHelper extends Logging {
return
}
- if (!isYarnResourceTypesAvailable()) {
- logWarning("Ignoring custom resource requests because " +
- "the version of YARN does not support it!")
- return
- }
-
- val resInfoClass = Utils.classForName(RESOURCE_INFO_CLASS)
- val setResourceInformationMethod =
- try {
- resource.getClass.getMethod("setResourceInformation", classOf[String],
resInfoClass)
- } catch {
- case e: NoSuchMethodException =>
- throw new SparkException(
- s"Cannot find setResourceInformation in ${resource.getClass}. " +
- "This is likely due to a JAR conflict between different YARN
versions.", e)
- }
-
resources.foreach { case (name, rawAmount) =>
try {
val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount
@@ -180,32 +155,21 @@ private object ResourceRequestHelper extends Logging {
case _ => unitPart
}
logDebug(s"Registering resource with name: $name, amount: $amount,
unit: $unit")
- val resourceInformation = createResourceInformation(name, amount,
unit, resInfoClass)
- setResourceInformationMethod.invoke(
- resource, name, resourceInformation.asInstanceOf[AnyRef])
+ val resourceInformation = createResourceInformation(name, amount, unit)
+ resource.setResourceInformation(name, resourceInformation)
} catch {
case _: MatchError =>
throw new IllegalArgumentException(s"Resource request for '$name'
('$rawAmount') " +
s"does not match pattern $AMOUNT_AND_UNIT_REGEX.")
case CausedBy(e: IllegalArgumentException) =>
throw new IllegalArgumentException(s"Invalid request for $name:
${e.getMessage}")
- case e: InvocationTargetException =>
- if (e.getCause != null) {
- if (Try(Utils.classForName(RESOURCE_NOT_FOUND)).isSuccess) {
- if
(e.getCause().getClass().getName().equals(RESOURCE_NOT_FOUND)) {
- // warn a couple times and then stop so we don't spam the logs
- if (numResourceErrors < 2) {
- logWarning(s"YARN doesn't know about resource $name, your
resource discovery " +
- s"has to handle properly discovering and isolating the
resource! Error: " +
- s"${e.getCause().getMessage}")
- numResourceErrors += 1
- }
- } else {
- throw e.getCause
- }
- } else {
- throw e.getCause
- }
+ case e: ResourceNotFoundException =>
+ // warn a couple times and then stop so we don't spam the logs
+ if (numResourceErrors < 2) {
+ logWarning(s"YARN doesn't know about resource $name, your resource
discovery " +
+ s"has to handle properly discovering and isolating the resource!
Error: " +
+ s"${e.getCause.getMessage}")
+ numResourceErrors += 1
}
}
}
@@ -214,38 +178,11 @@ private object ResourceRequestHelper extends Logging {
private def createResourceInformation(
resourceName: String,
amount: Long,
- unit: String,
- resInfoClass: Class[_]): Any = {
- val resourceInformation =
- if (unit.nonEmpty) {
- val resInfoNewInstanceMethod = resInfoClass.getMethod("newInstance",
- classOf[String], classOf[String], JLong.TYPE)
- resInfoNewInstanceMethod.invoke(null, resourceName, unit,
amount.asInstanceOf[JLong])
- } else {
- val resInfoNewInstanceMethod = resInfoClass.getMethod("newInstance",
- classOf[String], JLong.TYPE)
- resInfoNewInstanceMethod.invoke(null, resourceName,
amount.asInstanceOf[JLong])
- }
- resourceInformation
- }
-
- def isYarnCustomResourcesNonEmpty(resource: Resource): Boolean = {
- try {
- // Use reflection as this uses APIs only available in Hadoop 3
- val getResourcesMethod = resource.getClass().getMethod("getResources")
- val resources =
getResourcesMethod.invoke(resource).asInstanceOf[Array[Any]]
- if (resources.nonEmpty) true else false
- } catch {
- case _: NoSuchMethodException => false
+ unit: String): ResourceInformation = {
+ if (unit.nonEmpty) {
+ ResourceInformation.newInstance(resourceName, unit, amount)
+ } else {
+ ResourceInformation.newInstance(resourceName, amount)
}
}
-
- /**
- * Checks whether Hadoop 2.x or 3 is used as a dependency.
- * In case of Hadoop 3 and later, the ResourceInformation class
- * should be available on the classpath.
- */
- def isYarnResourceTypesAvailable(): Boolean = {
- Try(Utils.classForName(RESOURCE_INFO_CLASS)).isSuccess
- }
}
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 4b55d48cda3..07c4da0ee81 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -508,9 +508,8 @@ private[yarn] class YarnAllocator(
s" ResourceProfile Id: $rpId, each with " +
s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory."
- if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
- ResourceRequestHelper.isYarnCustomResourcesNonEmpty(resource)) {
- requestContainerMessage ++= s" with custom resources: " +
resource.toString
+ if (resource.getResources().nonEmpty) {
+ requestContainerMessage ++= s" with custom resources: $resource"
}
logInfo(requestContainerMessage)
}
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 2f7e0406555..b5fb78b106b 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -48,7 +48,6 @@ import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils}
-import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceID
@@ -216,9 +215,8 @@ class ClientSuite extends SparkFunSuite with Matchers {
appContext.getPriority.getPriority should be (1)
}
- test("specify a more specific type for the application") {
- // TODO (SPARK-31733) Make this test case pass with hadoop-3
- assume(!isYarnResourceTypesAvailable)
+ // TODO (SPARK-31733) Make this test case pass with hadoop-3
+ ignore("specify a more specific type for the application") {
// When the type exceeds 20 characters will be truncated by yarn
val appTypes = Map(
1 -> ("", ""),
@@ -468,7 +466,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
"cluster" -> YARN_DRIVER_RESOURCE_TYPES_PREFIX
).foreach { case (deployMode, prefix) =>
test(s"custom resource request ($deployMode mode)") {
- assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val resources = Map("fpga" -> 2, "gpu" -> 3)
ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
@@ -487,14 +484,12 @@ class ClientSuite extends SparkFunSuite with Matchers {
containerLaunchContext)
resources.foreach { case (name, value) =>
- ResourceRequestTestHelper.getRequestedValue(appContext.getResource,
name) should be (value)
+ appContext.getResource.getResourceInformation(name).getValue should be
(value)
}
}
}
test("custom driver resource request yarn config and spark config fails") {
- assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
-
val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu",
conf.get(YARN_FPGA_DEVICE) -> "fpga")
ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
@@ -516,7 +511,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
}
test("custom executor resource request yarn config and spark config fails") {
- assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu",
conf.get(YARN_FPGA_DEVICE) -> "fpga")
ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
@@ -539,7 +533,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
test("custom resources spark config mapped to yarn config") {
- assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
val yarnMadeupResource = "yarn.io/madeup"
val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu",
@@ -562,8 +555,8 @@ class ClientSuite extends SparkFunSuite with Matchers {
new YarnClientApplication(getNewApplicationResponse, appContext),
containerLaunchContext)
- val yarnRInfo =
ResourceRequestTestHelper.getResources(newContext.getResource)
- val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name ->
rInfo.value)).toMap
+ val yarnRInfo = newContext.getResource.getResources
+ val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName ->
rInfo.getValue)).toMap
assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).nonEmpty)
assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).get === 3)
assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).nonEmpty)
@@ -573,7 +566,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
}
test("gpu/fpga spark resources mapped to custom yarn resources") {
- assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
val gpuCustomName = "custom/gpu"
val fpgaCustomName = "custom/fpga"
@@ -595,8 +587,8 @@ class ClientSuite extends SparkFunSuite with Matchers {
new YarnClientApplication(getNewApplicationResponse, appContext),
containerLaunchContext)
- val yarnRInfo =
ResourceRequestTestHelper.getResources(newContext.getResource)
- val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name ->
rInfo.value)).toMap
+ val yarnRInfo = newContext.getResource.getResources
+ val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName ->
rInfo.getValue)).toMap
assert(allResourceInfo.get(gpuCustomName).nonEmpty)
assert(allResourceInfo.get(gpuCustomName).get === 3)
assert(allResourceInfo.get(fpgaCustomName).nonEmpty)
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
index 874cc08d6d6..575784bba34 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
@@ -24,7 +24,6 @@ import org.scalatest.matchers.should.Matchers._
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.yarn.ResourceRequestHelper._
-import
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config.{DRIVER_CORES, DRIVER_MEMORY,
EXECUTOR_CORES, EXECUTOR_MEMORY}
import org.apache.spark.resource.ResourceUtils.AMOUNT
@@ -96,24 +95,26 @@ class ResourceRequestHelperSuite extends SparkFunSuite with
Matchers {
}
Seq(
- "value with unit" -> Seq(ResourceInformation(CUSTOM_RES_1, 2, "G")),
- "value without unit" -> Seq(ResourceInformation(CUSTOM_RES_1, 123, "")),
- "multiple resources" -> Seq(ResourceInformation(CUSTOM_RES_1, 123, "m"),
- ResourceInformation(CUSTOM_RES_2, 10, "G"))
+ "value with unit" -> Seq((CUSTOM_RES_1, 2, "G")),
+ "value without unit" -> Seq((CUSTOM_RES_1, 123, "")),
+ "multiple resources" -> Seq((CUSTOM_RES_1, 123, "m"), (CUSTOM_RES_2, 10,
"G"))
).foreach { case (name, resources) =>
test(s"valid request: $name") {
- assume(isYarnResourceTypesAvailable())
- val resourceDefs = resources.map { r => r.name }
- val requests = resources.map { r => (r.name, r.value.toString + r.unit)
}.toMap
+ val resourceDefs = resources.map { case (rName, _, _) => rName }
+ val requests = resources.map { case (rName, rValue, rUnit) =>
+ (rName, rValue.toString + rUnit)
+ }.toMap
ResourceRequestTestHelper.initializeResourceTypes(resourceDefs)
val resource = createResource()
setResourceRequests(requests, resource)
- resources.foreach { r =>
- val requested =
ResourceRequestTestHelper.getResourceInformationByName(resource, r.name)
- assert(requested === r)
+ resources.foreach { case (rName, rValue, rUnit) =>
+ val requested = resource.getResourceInformation(rName)
+ assert(requested.getName === rName)
+ assert(requested.getValue === rValue)
+ assert(requested.getUnits === rUnit)
}
}
}
@@ -124,7 +125,6 @@ class ResourceRequestHelperSuite extends SparkFunSuite with
Matchers {
("invalid unit", CUSTOM_RES_1, "123ppp")
).foreach { case (name, key, value) =>
test(s"invalid request: $name") {
- assume(isYarnResourceTypesAvailable())
ResourceRequestTestHelper.initializeResourceTypes(Seq(key))
val resource = createResource()
@@ -147,7 +147,6 @@ class ResourceRequestHelperSuite extends SparkFunSuite with
Matchers {
NEW_CONFIG_DRIVER_CORES -> "1G"
).foreach { case (key, value) =>
test(s"disallowed resource request: $key") {
- assume(isYarnResourceTypesAvailable())
val conf = new SparkConf(false).set(key, value)
val thrown = intercept[SparkException] {
validateResources(conf)
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
index 796e0990323..19c842f0928 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
@@ -19,17 +19,11 @@ package org.apache.spark.deploy.yarn
import scala.collection.JavaConverters._
-import org.apache.hadoop.yarn.api.records.Resource
-
-import org.apache.spark.util.Utils
+import org.apache.hadoop.yarn.api.records.ResourceTypeInfo
+import org.apache.hadoop.yarn.util.resource.ResourceUtils
object ResourceRequestTestHelper {
def initializeResourceTypes(resourceTypes: Seq[String]): Unit = {
- if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
- throw new IllegalStateException("This method should not be invoked " +
- "since YARN resource types is not available because of old Hadoop
version!" )
- }
-
// ResourceUtils.reinitializeResources() is the YARN-way
// to specify resources for the execution of the tests.
// This method should receive standard resources with names of memory-mb
and vcores.
@@ -37,64 +31,11 @@ object ResourceRequestTestHelper {
// with different names e.g. memory, YARN would throw various exceptions
// because it relies on that standard resources are always specified.
val defaultResourceTypes = List(
- createResourceTypeInfo("memory-mb"),
- createResourceTypeInfo("vcores"))
- val customResourceTypes = resourceTypes.map(createResourceTypeInfo)
+ ResourceTypeInfo.newInstance("memory-mb"),
+ ResourceTypeInfo.newInstance("vcores"))
+ val customResourceTypes = resourceTypes.map(ResourceTypeInfo.newInstance)
val allResourceTypes = defaultResourceTypes ++ customResourceTypes
- val resourceUtilsClass =
- Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
- val reinitializeResourcesMethod =
resourceUtilsClass.getMethod("reinitializeResources",
- classOf[java.util.List[AnyRef]])
- reinitializeResourcesMethod.invoke(null, allResourceTypes.asJava)
- }
-
- private def createResourceTypeInfo(resourceName: String): AnyRef = {
- val resTypeInfoClass =
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
- val resTypeInfoNewInstanceMethod =
resTypeInfoClass.getMethod("newInstance", classOf[String])
- resTypeInfoNewInstanceMethod.invoke(null, resourceName)
- }
-
- def getRequestedValue(res: Resource, rtype: String): AnyRef = {
- val resourceInformation = getResourceInformation(res, rtype)
- invokeMethod(resourceInformation, "getValue")
- }
-
- def getResourceInformationByName(res: Resource, nameParam: String):
ResourceInformation = {
- val resourceInformation: AnyRef = getResourceInformation(res, nameParam)
- val name = invokeMethod(resourceInformation,
"getName").asInstanceOf[String]
- val value = invokeMethod(resourceInformation,
"getValue").asInstanceOf[Long]
- val units = invokeMethod(resourceInformation,
"getUnits").asInstanceOf[String]
- ResourceInformation(name, value, units)
+ ResourceUtils.reinitializeResources(allResourceTypes.asJava)
}
-
- private def getResourceInformation(res: Resource, name: String): AnyRef = {
- if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
- throw new IllegalStateException("assertResourceTypeValue() should not be
invoked " +
- "since yarn resource types is not available because of old Hadoop
version!")
- }
-
- val getResourceInformationMethod =
res.getClass.getMethod("getResourceInformation",
- classOf[String])
- val resourceInformation = getResourceInformationMethod.invoke(res, name)
- resourceInformation
- }
-
- private def invokeMethod(resourceInformation: AnyRef, methodName: String):
AnyRef = {
- val getValueMethod = resourceInformation.getClass.getMethod(methodName)
- getValueMethod.invoke(resourceInformation)
- }
-
- def getResources(res: Resource): Array[ResourceInformation] = {
- val getResourceInformationMethod = res.getClass.getMethod("getResources")
- val rInfoArray =
getResourceInformationMethod.invoke(res).asInstanceOf[Array[AnyRef]]
- rInfoArray.map { rInfo =>
- val name = invokeMethod(rInfo, "getName").asInstanceOf[String]
- val value = invokeMethod(rInfo, "getValue").asInstanceOf[Long]
- val units = invokeMethod(rInfo, "getUnits").asInstanceOf[String]
- ResourceInformation(name, value, units)
- }
- }
-
- case class ResourceInformation(name: String, value: Long, unit: String)
}
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 59b1e57aa5e..f2b4222b85e 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -188,7 +188,6 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers {
}
test("single container allocated with ResourceProfile") {
- assume(isYarnResourceTypesAvailable())
val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE))
ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
// create default profile so we get a different id to test below
@@ -223,7 +222,6 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers {
}
test("multiple containers allocated with ResourceProfiles") {
- assume(isYarnResourceTypesAvailable())
val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE),
sparkConf.get(YARN_FPGA_DEVICE))
ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
// create default profile so we get a different id to test below
@@ -275,7 +273,6 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers {
}
test("custom resource requested from yarn") {
- assume(isYarnResourceTypesAvailable())
ResourceRequestTestHelper.initializeResourceTypes(List("gpu"))
val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
@@ -299,7 +296,6 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers {
}
test("custom spark resource mapped to yarn resource configs") {
- assume(isYarnResourceTypesAvailable())
val yarnMadeupResource = "yarn.io/madeup"
val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE),
sparkConf.get(YARN_FPGA_DEVICE),
yarnMadeupResource)
@@ -314,8 +310,8 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers {
handler.updateResourceRequests()
val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
- val yarnRInfo = ResourceRequestTestHelper.getResources(defaultResource)
- val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value)
).toMap
+ val yarnRInfo = defaultResource.getResources
+ val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.getName ->
rInfo.getValue) ).toMap
assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).nonEmpty)
assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).get === 3)
assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).nonEmpty)
@@ -325,7 +321,6 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers {
}
test("gpu/fpga spark resource mapped to custom yarn resource") {
- assume(isYarnResourceTypesAvailable())
val gpuCustomName = "custom/gpu"
val fpgaCustomName = "custom/fpga"
val originalGpu = sparkConf.get(YARN_GPU_DEVICE)
@@ -343,8 +338,8 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers {
handler.updateResourceRequests()
val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
- val yarnRInfo = ResourceRequestTestHelper.getResources(defaultResource)
- val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name ->
rInfo.value)).toMap
+ val yarnRInfo = defaultResource.getResources
+ val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName ->
rInfo.getValue)).toMap
assert(allResourceInfo.get(gpuCustomName).nonEmpty)
assert(allResourceInfo.get(gpuCustomName).get === 3)
assert(allResourceInfo.get(fpgaCustomName).nonEmpty)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]