This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5bade8addd9 [SPARK-43202][YARN] Replace reflection w/ direct calling for YARN Resource API 5bade8addd9 is described below commit 5bade8addd91ee688b0363623210f24117e1583b Author: Cheng Pan <cheng...@apache.org> AuthorDate: Thu Apr 20 13:01:34 2023 -0700 [SPARK-43202][YARN] Replace reflection w/ direct calling for YARN Resource API ### What changes were proposed in this pull request? Replace reflection w/ direct calling for YARN Resource API, including - `org.apache.hadoop.yarn.api.records.ResourceInformation`, - `org.apache.hadoop.yarn.exceptions.ResourceNotFoundException` which were added in [YARN-4081](https://issues.apache.org/jira/browse/YARN-4081) (Hadoop 2.10.0/3.0.0) ### Why are the changes needed? Simplify code. Since [SPARK-42452](https://issues.apache.org/jira/browse/SPARK-42452) removed support for Hadoop 2, we can call those API directly now. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #40860 from pan3793/SPARK-43197. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Chao Sun <sunc...@apple.com> --- .../spark/deploy/yarn/ResourceRequestHelper.scala | 97 ++++------------------ .../apache/spark/deploy/yarn/YarnAllocator.scala | 5 +- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 22 ++--- .../deploy/yarn/ResourceRequestHelperSuite.scala | 25 +++--- .../deploy/yarn/ResourceRequestTestHelper.scala | 71 ++-------------- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 13 +-- 6 files changed, 48 insertions(+), 185 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala index 5a5334dc763..0dd4e0a6c8a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala @@ -17,13 +17,11 @@ package org.apache.spark.deploy.yarn -import java.lang.{Long => JLong} -import java.lang.reflect.InvocationTargetException - import scala.collection.mutable -import scala.util.Try import org.apache.hadoop.yarn.api.records.Resource +import org.apache.hadoop.yarn.api.records.ResourceInformation +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.yarn.config._ @@ -31,16 +29,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceID import org.apache.spark.resource.ResourceUtils.{AMOUNT, FPGA, GPU} -import org.apache.spark.util.{CausedBy, Utils} +import org.apache.spark.util.CausedBy -/** - * This helper class uses some of Hadoop 3 methods from the YARN API, - * so we need to use reflection to avoid compile error when building against Hadoop 2.x - */ private object ResourceRequestHelper extends Logging { private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r - private val RESOURCE_INFO_CLASS = "org.apache.hadoop.yarn.api.records.ResourceInformation" - private val RESOURCE_NOT_FOUND = "org.apache.hadoop.yarn.exceptions.ResourceNotFoundException" @volatile private var numResourceErrors: Int = 0 private[yarn] def getYarnResourcesAndAmounts( @@ -152,23 +144,6 @@ private object ResourceRequestHelper extends Logging { return } - if (!isYarnResourceTypesAvailable()) { - logWarning("Ignoring custom resource requests because " + - "the version of YARN does not support it!") - return - } - - val resInfoClass = Utils.classForName(RESOURCE_INFO_CLASS) - val setResourceInformationMethod = - try { - resource.getClass.getMethod("setResourceInformation", classOf[String], resInfoClass) - } catch { - case e: NoSuchMethodException => - throw new SparkException( - s"Cannot find setResourceInformation in ${resource.getClass}. " + - "This is likely due to a JAR conflict between different YARN versions.", e) - } - resources.foreach { case (name, rawAmount) => try { val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount @@ -180,32 +155,21 @@ private object ResourceRequestHelper extends Logging { case _ => unitPart } logDebug(s"Registering resource with name: $name, amount: $amount, unit: $unit") - val resourceInformation = createResourceInformation(name, amount, unit, resInfoClass) - setResourceInformationMethod.invoke( - resource, name, resourceInformation.asInstanceOf[AnyRef]) + val resourceInformation = createResourceInformation(name, amount, unit) + resource.setResourceInformation(name, resourceInformation) } catch { case _: MatchError => throw new IllegalArgumentException(s"Resource request for '$name' ('$rawAmount') " + s"does not match pattern $AMOUNT_AND_UNIT_REGEX.") case CausedBy(e: IllegalArgumentException) => throw new IllegalArgumentException(s"Invalid request for $name: ${e.getMessage}") - case e: InvocationTargetException => - if (e.getCause != null) { - if (Try(Utils.classForName(RESOURCE_NOT_FOUND)).isSuccess) { - if (e.getCause().getClass().getName().equals(RESOURCE_NOT_FOUND)) { - // warn a couple times and then stop so we don't spam the logs - if (numResourceErrors < 2) { - logWarning(s"YARN doesn't know about resource $name, your resource discovery " + - s"has to handle properly discovering and isolating the resource! Error: " + - s"${e.getCause().getMessage}") - numResourceErrors += 1 - } - } else { - throw e.getCause - } - } else { - throw e.getCause - } + case e: ResourceNotFoundException => + // warn a couple times and then stop so we don't spam the logs + if (numResourceErrors < 2) { + logWarning(s"YARN doesn't know about resource $name, your resource discovery " + + s"has to handle properly discovering and isolating the resource! Error: " + + s"${e.getCause.getMessage}") + numResourceErrors += 1 } } } @@ -214,38 +178,11 @@ private object ResourceRequestHelper extends Logging { private def createResourceInformation( resourceName: String, amount: Long, - unit: String, - resInfoClass: Class[_]): Any = { - val resourceInformation = - if (unit.nonEmpty) { - val resInfoNewInstanceMethod = resInfoClass.getMethod("newInstance", - classOf[String], classOf[String], JLong.TYPE) - resInfoNewInstanceMethod.invoke(null, resourceName, unit, amount.asInstanceOf[JLong]) - } else { - val resInfoNewInstanceMethod = resInfoClass.getMethod("newInstance", - classOf[String], JLong.TYPE) - resInfoNewInstanceMethod.invoke(null, resourceName, amount.asInstanceOf[JLong]) - } - resourceInformation - } - - def isYarnCustomResourcesNonEmpty(resource: Resource): Boolean = { - try { - // Use reflection as this uses APIs only available in Hadoop 3 - val getResourcesMethod = resource.getClass().getMethod("getResources") - val resources = getResourcesMethod.invoke(resource).asInstanceOf[Array[Any]] - if (resources.nonEmpty) true else false - } catch { - case _: NoSuchMethodException => false + unit: String): ResourceInformation = { + if (unit.nonEmpty) { + ResourceInformation.newInstance(resourceName, unit, amount) + } else { + ResourceInformation.newInstance(resourceName, amount) } } - - /** - * Checks whether Hadoop 2.x or 3 is used as a dependency. - * In case of Hadoop 3 and later, the ResourceInformation class - * should be available on the classpath. - */ - def isYarnResourceTypesAvailable(): Boolean = { - Try(Utils.classForName(RESOURCE_INFO_CLASS)).isSuccess - } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 4b55d48cda3..07c4da0ee81 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -508,9 +508,8 @@ private[yarn] class YarnAllocator( s" ResourceProfile Id: $rpId, each with " + s"${resource.getVirtualCores} core(s) and " + s"${resource.getMemory} MB memory." - if (ResourceRequestHelper.isYarnResourceTypesAvailable() && - ResourceRequestHelper.isYarnCustomResourcesNonEmpty(resource)) { - requestContainerMessage ++= s" with custom resources: " + resource.toString + if (resource.getResources().nonEmpty) { + requestContainerMessage ++= s" with custom resources: $resource" } logInfo(requestContainerMessage) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 2f7e0406555..b5fb78b106b 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -48,7 +48,6 @@ import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils} -import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceID @@ -216,9 +215,8 @@ class ClientSuite extends SparkFunSuite with Matchers { appContext.getPriority.getPriority should be (1) } - test("specify a more specific type for the application") { - // TODO (SPARK-31733) Make this test case pass with hadoop-3 - assume(!isYarnResourceTypesAvailable) + // TODO (SPARK-31733) Make this test case pass with hadoop-3 + ignore("specify a more specific type for the application") { // When the type exceeds 20 characters will be truncated by yarn val appTypes = Map( 1 -> ("", ""), @@ -468,7 +466,6 @@ class ClientSuite extends SparkFunSuite with Matchers { "cluster" -> YARN_DRIVER_RESOURCE_TYPES_PREFIX ).foreach { case (deployMode, prefix) => test(s"custom resource request ($deployMode mode)") { - assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) val resources = Map("fpga" -> 2, "gpu" -> 3) ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) @@ -487,14 +484,12 @@ class ClientSuite extends SparkFunSuite with Matchers { containerLaunchContext) resources.foreach { case (name, value) => - ResourceRequestTestHelper.getRequestedValue(appContext.getResource, name) should be (value) + appContext.getResource.getResourceInformation(name).getValue should be (value) } } } test("custom driver resource request yarn config and spark config fails") { - assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) - val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu", conf.get(YARN_FPGA_DEVICE) -> "fpga") ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) @@ -516,7 +511,6 @@ class ClientSuite extends SparkFunSuite with Matchers { } test("custom executor resource request yarn config and spark config fails") { - assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu", conf.get(YARN_FPGA_DEVICE) -> "fpga") ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) @@ -539,7 +533,6 @@ class ClientSuite extends SparkFunSuite with Matchers { test("custom resources spark config mapped to yarn config") { - assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") val yarnMadeupResource = "yarn.io/madeup" val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu", @@ -562,8 +555,8 @@ class ClientSuite extends SparkFunSuite with Matchers { new YarnClientApplication(getNewApplicationResponse, appContext), containerLaunchContext) - val yarnRInfo = ResourceRequestTestHelper.getResources(newContext.getResource) - val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name -> rInfo.value)).toMap + val yarnRInfo = newContext.getResource.getResources + val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> rInfo.getValue)).toMap assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).nonEmpty) assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).get === 3) assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).nonEmpty) @@ -573,7 +566,6 @@ class ClientSuite extends SparkFunSuite with Matchers { } test("gpu/fpga spark resources mapped to custom yarn resources") { - assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") val gpuCustomName = "custom/gpu" val fpgaCustomName = "custom/fpga" @@ -595,8 +587,8 @@ class ClientSuite extends SparkFunSuite with Matchers { new YarnClientApplication(getNewApplicationResponse, appContext), containerLaunchContext) - val yarnRInfo = ResourceRequestTestHelper.getResources(newContext.getResource) - val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name -> rInfo.value)).toMap + val yarnRInfo = newContext.getResource.getResources + val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> rInfo.getValue)).toMap assert(allResourceInfo.get(gpuCustomName).nonEmpty) assert(allResourceInfo.get(gpuCustomName).get === 3) assert(allResourceInfo.get(fpgaCustomName).nonEmpty) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala index 874cc08d6d6..575784bba34 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala @@ -24,7 +24,6 @@ import org.scalatest.matchers.should.Matchers._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.yarn.ResourceRequestHelper._ -import org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config.{DRIVER_CORES, DRIVER_MEMORY, EXECUTOR_CORES, EXECUTOR_MEMORY} import org.apache.spark.resource.ResourceUtils.AMOUNT @@ -96,24 +95,26 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { } Seq( - "value with unit" -> Seq(ResourceInformation(CUSTOM_RES_1, 2, "G")), - "value without unit" -> Seq(ResourceInformation(CUSTOM_RES_1, 123, "")), - "multiple resources" -> Seq(ResourceInformation(CUSTOM_RES_1, 123, "m"), - ResourceInformation(CUSTOM_RES_2, 10, "G")) + "value with unit" -> Seq((CUSTOM_RES_1, 2, "G")), + "value without unit" -> Seq((CUSTOM_RES_1, 123, "")), + "multiple resources" -> Seq((CUSTOM_RES_1, 123, "m"), (CUSTOM_RES_2, 10, "G")) ).foreach { case (name, resources) => test(s"valid request: $name") { - assume(isYarnResourceTypesAvailable()) - val resourceDefs = resources.map { r => r.name } - val requests = resources.map { r => (r.name, r.value.toString + r.unit) }.toMap + val resourceDefs = resources.map { case (rName, _, _) => rName } + val requests = resources.map { case (rName, rValue, rUnit) => + (rName, rValue.toString + rUnit) + }.toMap ResourceRequestTestHelper.initializeResourceTypes(resourceDefs) val resource = createResource() setResourceRequests(requests, resource) - resources.foreach { r => - val requested = ResourceRequestTestHelper.getResourceInformationByName(resource, r.name) - assert(requested === r) + resources.foreach { case (rName, rValue, rUnit) => + val requested = resource.getResourceInformation(rName) + assert(requested.getName === rName) + assert(requested.getValue === rValue) + assert(requested.getUnits === rUnit) } } } @@ -124,7 +125,6 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { ("invalid unit", CUSTOM_RES_1, "123ppp") ).foreach { case (name, key, value) => test(s"invalid request: $name") { - assume(isYarnResourceTypesAvailable()) ResourceRequestTestHelper.initializeResourceTypes(Seq(key)) val resource = createResource() @@ -147,7 +147,6 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { NEW_CONFIG_DRIVER_CORES -> "1G" ).foreach { case (key, value) => test(s"disallowed resource request: $key") { - assume(isYarnResourceTypesAvailable()) val conf = new SparkConf(false).set(key, value) val thrown = intercept[SparkException] { validateResources(conf) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala index 796e0990323..19c842f0928 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala @@ -19,17 +19,11 @@ package org.apache.spark.deploy.yarn import scala.collection.JavaConverters._ -import org.apache.hadoop.yarn.api.records.Resource - -import org.apache.spark.util.Utils +import org.apache.hadoop.yarn.api.records.ResourceTypeInfo +import org.apache.hadoop.yarn.util.resource.ResourceUtils object ResourceRequestTestHelper { def initializeResourceTypes(resourceTypes: Seq[String]): Unit = { - if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) { - throw new IllegalStateException("This method should not be invoked " + - "since YARN resource types is not available because of old Hadoop version!" ) - } - // ResourceUtils.reinitializeResources() is the YARN-way // to specify resources for the execution of the tests. // This method should receive standard resources with names of memory-mb and vcores. @@ -37,64 +31,11 @@ object ResourceRequestTestHelper { // with different names e.g. memory, YARN would throw various exceptions // because it relies on that standard resources are always specified. val defaultResourceTypes = List( - createResourceTypeInfo("memory-mb"), - createResourceTypeInfo("vcores")) - val customResourceTypes = resourceTypes.map(createResourceTypeInfo) + ResourceTypeInfo.newInstance("memory-mb"), + ResourceTypeInfo.newInstance("vcores")) + val customResourceTypes = resourceTypes.map(ResourceTypeInfo.newInstance) val allResourceTypes = defaultResourceTypes ++ customResourceTypes - val resourceUtilsClass = - Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils") - val reinitializeResourcesMethod = resourceUtilsClass.getMethod("reinitializeResources", - classOf[java.util.List[AnyRef]]) - reinitializeResourcesMethod.invoke(null, allResourceTypes.asJava) - } - - private def createResourceTypeInfo(resourceName: String): AnyRef = { - val resTypeInfoClass = Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo") - val resTypeInfoNewInstanceMethod = resTypeInfoClass.getMethod("newInstance", classOf[String]) - resTypeInfoNewInstanceMethod.invoke(null, resourceName) - } - - def getRequestedValue(res: Resource, rtype: String): AnyRef = { - val resourceInformation = getResourceInformation(res, rtype) - invokeMethod(resourceInformation, "getValue") - } - - def getResourceInformationByName(res: Resource, nameParam: String): ResourceInformation = { - val resourceInformation: AnyRef = getResourceInformation(res, nameParam) - val name = invokeMethod(resourceInformation, "getName").asInstanceOf[String] - val value = invokeMethod(resourceInformation, "getValue").asInstanceOf[Long] - val units = invokeMethod(resourceInformation, "getUnits").asInstanceOf[String] - ResourceInformation(name, value, units) + ResourceUtils.reinitializeResources(allResourceTypes.asJava) } - - private def getResourceInformation(res: Resource, name: String): AnyRef = { - if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) { - throw new IllegalStateException("assertResourceTypeValue() should not be invoked " + - "since yarn resource types is not available because of old Hadoop version!") - } - - val getResourceInformationMethod = res.getClass.getMethod("getResourceInformation", - classOf[String]) - val resourceInformation = getResourceInformationMethod.invoke(res, name) - resourceInformation - } - - private def invokeMethod(resourceInformation: AnyRef, methodName: String): AnyRef = { - val getValueMethod = resourceInformation.getClass.getMethod(methodName) - getValueMethod.invoke(resourceInformation) - } - - def getResources(res: Resource): Array[ResourceInformation] = { - val getResourceInformationMethod = res.getClass.getMethod("getResources") - val rInfoArray = getResourceInformationMethod.invoke(res).asInstanceOf[Array[AnyRef]] - rInfoArray.map { rInfo => - val name = invokeMethod(rInfo, "getName").asInstanceOf[String] - val value = invokeMethod(rInfo, "getValue").asInstanceOf[Long] - val units = invokeMethod(rInfo, "getUnits").asInstanceOf[String] - ResourceInformation(name, value, units) - } - } - - case class ResourceInformation(name: String, value: Long, unit: String) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 59b1e57aa5e..f2b4222b85e 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -188,7 +188,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers { } test("single container allocated with ResourceProfile") { - assume(isYarnResourceTypesAvailable()) val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE)) ResourceRequestTestHelper.initializeResourceTypes(yarnResources) // create default profile so we get a different id to test below @@ -223,7 +222,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers { } test("multiple containers allocated with ResourceProfiles") { - assume(isYarnResourceTypesAvailable()) val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE), sparkConf.get(YARN_FPGA_DEVICE)) ResourceRequestTestHelper.initializeResourceTypes(yarnResources) // create default profile so we get a different id to test below @@ -275,7 +273,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers { } test("custom resource requested from yarn") { - assume(isYarnResourceTypesAvailable()) ResourceRequestTestHelper.initializeResourceTypes(List("gpu")) val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) @@ -299,7 +296,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers { } test("custom spark resource mapped to yarn resource configs") { - assume(isYarnResourceTypesAvailable()) val yarnMadeupResource = "yarn.io/madeup" val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE), sparkConf.get(YARN_FPGA_DEVICE), yarnMadeupResource) @@ -314,8 +310,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers { handler.updateResourceRequests() val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) - val yarnRInfo = ResourceRequestTestHelper.getResources(defaultResource) - val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value) ).toMap + val yarnRInfo = defaultResource.getResources + val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.getName -> rInfo.getValue) ).toMap assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).nonEmpty) assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).get === 3) assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).nonEmpty) @@ -325,7 +321,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers { } test("gpu/fpga spark resource mapped to custom yarn resource") { - assume(isYarnResourceTypesAvailable()) val gpuCustomName = "custom/gpu" val fpgaCustomName = "custom/fpga" val originalGpu = sparkConf.get(YARN_GPU_DEVICE) @@ -343,8 +338,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers { handler.updateResourceRequests() val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) - val yarnRInfo = ResourceRequestTestHelper.getResources(defaultResource) - val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name -> rInfo.value)).toMap + val yarnRInfo = defaultResource.getResources + val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> rInfo.getValue)).toMap assert(allResourceInfo.get(gpuCustomName).nonEmpty) assert(allResourceInfo.get(gpuCustomName).get === 3) assert(allResourceInfo.get(fpgaCustomName).nonEmpty) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org