This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bade8addd9 [SPARK-43202][YARN] Replace reflection w/ direct calling 
for YARN Resource API
5bade8addd9 is described below

commit 5bade8addd91ee688b0363623210f24117e1583b
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Thu Apr 20 13:01:34 2023 -0700

    [SPARK-43202][YARN] Replace reflection w/ direct calling for YARN Resource 
API
    
    ### What changes were proposed in this pull request?
    
    Replace reflection w/ direct calling for YARN Resource API, including
    
    - `org.apache.hadoop.yarn.api.records.ResourceInformation`,
    - `org.apache.hadoop.yarn.exceptions.ResourceNotFoundException`
    
    which were added in 
[YARN-4081](https://issues.apache.org/jira/browse/YARN-4081) (Hadoop 
2.10.0/3.0.0)
    
    ### Why are the changes needed?
    
    Simplify code. Since 
[SPARK-42452](https://issues.apache.org/jira/browse/SPARK-42452) removed 
support for Hadoop 2, we can call those API directly now.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #40860 from pan3793/SPARK-43197.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Chao Sun <sunc...@apple.com>
---
 .../spark/deploy/yarn/ResourceRequestHelper.scala  | 97 ++++------------------
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  5 +-
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 22 ++---
 .../deploy/yarn/ResourceRequestHelperSuite.scala   | 25 +++---
 .../deploy/yarn/ResourceRequestTestHelper.scala    | 71 ++--------------
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     | 13 +--
 6 files changed, 48 insertions(+), 185 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
index 5a5334dc763..0dd4e0a6c8a 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
@@ -17,13 +17,11 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.lang.{Long => JLong}
-import java.lang.reflect.InvocationTargetException
-
 import scala.collection.mutable
-import scala.util.Try
 
 import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.api.records.ResourceInformation
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.yarn.config._
@@ -31,16 +29,10 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.resource.ResourceID
 import org.apache.spark.resource.ResourceUtils.{AMOUNT, FPGA, GPU}
-import org.apache.spark.util.{CausedBy, Utils}
+import org.apache.spark.util.CausedBy
 
-/**
- * This helper class uses some of Hadoop 3 methods from the YARN API,
- * so we need to use reflection to avoid compile error when building against 
Hadoop 2.x
- */
 private object ResourceRequestHelper extends Logging {
   private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
-  private val RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation"
-  private val RESOURCE_NOT_FOUND = 
"org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
   @volatile private var numResourceErrors: Int = 0
 
   private[yarn] def getYarnResourcesAndAmounts(
@@ -152,23 +144,6 @@ private object ResourceRequestHelper extends Logging {
       return
     }
 
-    if (!isYarnResourceTypesAvailable()) {
-      logWarning("Ignoring custom resource requests because " +
-          "the version of YARN does not support it!")
-      return
-    }
-
-    val resInfoClass = Utils.classForName(RESOURCE_INFO_CLASS)
-    val setResourceInformationMethod =
-      try {
-        resource.getClass.getMethod("setResourceInformation", classOf[String], 
resInfoClass)
-      } catch {
-        case e: NoSuchMethodException =>
-          throw new SparkException(
-            s"Cannot find setResourceInformation in ${resource.getClass}. " +
-              "This is likely due to a JAR conflict between different YARN 
versions.", e)
-      }
-
     resources.foreach { case (name, rawAmount) =>
       try {
         val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount
@@ -180,32 +155,21 @@ private object ResourceRequestHelper extends Logging {
           case _ => unitPart
         }
         logDebug(s"Registering resource with name: $name, amount: $amount, 
unit: $unit")
-        val resourceInformation = createResourceInformation(name, amount, 
unit, resInfoClass)
-        setResourceInformationMethod.invoke(
-          resource, name, resourceInformation.asInstanceOf[AnyRef])
+        val resourceInformation = createResourceInformation(name, amount, unit)
+        resource.setResourceInformation(name, resourceInformation)
       } catch {
         case _: MatchError =>
           throw new IllegalArgumentException(s"Resource request for '$name' 
('$rawAmount') " +
               s"does not match pattern $AMOUNT_AND_UNIT_REGEX.")
         case CausedBy(e: IllegalArgumentException) =>
           throw new IllegalArgumentException(s"Invalid request for $name: 
${e.getMessage}")
-        case e: InvocationTargetException =>
-          if (e.getCause != null) {
-            if (Try(Utils.classForName(RESOURCE_NOT_FOUND)).isSuccess) {
-              if 
(e.getCause().getClass().getName().equals(RESOURCE_NOT_FOUND)) {
-                // warn a couple times and then stop so we don't spam the logs
-                if (numResourceErrors < 2) {
-                  logWarning(s"YARN doesn't know about resource $name, your 
resource discovery " +
-                    s"has to handle properly discovering and isolating the 
resource! Error: " +
-                    s"${e.getCause().getMessage}")
-                  numResourceErrors += 1
-                }
-              } else {
-                throw e.getCause
-              }
-            } else {
-              throw e.getCause
-            }
+        case e: ResourceNotFoundException =>
+          // warn a couple times and then stop so we don't spam the logs
+          if (numResourceErrors < 2) {
+            logWarning(s"YARN doesn't know about resource $name, your resource 
discovery " +
+              s"has to handle properly discovering and isolating the resource! 
Error: " +
+              s"${e.getCause.getMessage}")
+            numResourceErrors += 1
           }
       }
     }
@@ -214,38 +178,11 @@ private object ResourceRequestHelper extends Logging {
   private def createResourceInformation(
       resourceName: String,
       amount: Long,
-      unit: String,
-      resInfoClass: Class[_]): Any = {
-    val resourceInformation =
-      if (unit.nonEmpty) {
-        val resInfoNewInstanceMethod = resInfoClass.getMethod("newInstance",
-          classOf[String], classOf[String], JLong.TYPE)
-        resInfoNewInstanceMethod.invoke(null, resourceName, unit, 
amount.asInstanceOf[JLong])
-      } else {
-        val resInfoNewInstanceMethod = resInfoClass.getMethod("newInstance",
-          classOf[String], JLong.TYPE)
-        resInfoNewInstanceMethod.invoke(null, resourceName, 
amount.asInstanceOf[JLong])
-      }
-    resourceInformation
-  }
-
-  def isYarnCustomResourcesNonEmpty(resource: Resource): Boolean = {
-    try {
-      // Use reflection as this uses APIs only available in Hadoop 3
-      val getResourcesMethod = resource.getClass().getMethod("getResources")
-      val resources = 
getResourcesMethod.invoke(resource).asInstanceOf[Array[Any]]
-      if (resources.nonEmpty) true else false
-    } catch {
-      case  _: NoSuchMethodException => false
+      unit: String): ResourceInformation = {
+    if (unit.nonEmpty) {
+      ResourceInformation.newInstance(resourceName, unit, amount)
+    } else {
+      ResourceInformation.newInstance(resourceName, amount)
     }
   }
-
-  /**
-   * Checks whether Hadoop 2.x or 3 is used as a dependency.
-   * In case of Hadoop 3 and later, the ResourceInformation class
-   * should be available on the classpath.
-   */
-  def isYarnResourceTypesAvailable(): Boolean = {
-    Try(Utils.classForName(RESOURCE_INFO_CLASS)).isSuccess
-  }
 }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 4b55d48cda3..07c4da0ee81 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -508,9 +508,8 @@ private[yarn] class YarnAllocator(
             s" ResourceProfile Id: $rpId, each with " +
             s"${resource.getVirtualCores} core(s) and " +
             s"${resource.getMemory} MB memory."
-          if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
-            ResourceRequestHelper.isYarnCustomResourcesNonEmpty(resource)) {
-            requestContainerMessage ++= s" with custom resources: " + 
resource.toString
+          if (resource.getResources().nonEmpty) {
+            requestContainerMessage ++= s" with custom resources: $resource"
           }
           logInfo(requestContainerMessage)
         }
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 2f7e0406555..b5fb78b106b 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -48,7 +48,6 @@ import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils}
-import org.apache.spark.deploy.yarn.ResourceRequestHelper._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.config._
 import org.apache.spark.resource.ResourceID
@@ -216,9 +215,8 @@ class ClientSuite extends SparkFunSuite with Matchers {
     appContext.getPriority.getPriority should be (1)
   }
 
-  test("specify a more specific type for the application") {
-    // TODO (SPARK-31733) Make this test case pass with hadoop-3
-    assume(!isYarnResourceTypesAvailable)
+  // TODO (SPARK-31733) Make this test case pass with hadoop-3
+  ignore("specify a more specific type for the application") {
     // When the type exceeds 20 characters will be truncated by yarn
     val appTypes = Map(
       1 -> ("", ""),
@@ -468,7 +466,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
     "cluster" -> YARN_DRIVER_RESOURCE_TYPES_PREFIX
   ).foreach { case (deployMode, prefix) =>
     test(s"custom resource request ($deployMode mode)") {
-      assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
       val resources = Map("fpga" -> 2, "gpu" -> 3)
       ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
 
@@ -487,14 +484,12 @@ class ClientSuite extends SparkFunSuite with Matchers {
         containerLaunchContext)
 
       resources.foreach { case (name, value) =>
-        ResourceRequestTestHelper.getRequestedValue(appContext.getResource, 
name) should be (value)
+        appContext.getResource.getResourceInformation(name).getValue should be 
(value)
       }
     }
   }
 
   test("custom driver resource request yarn config and spark config fails") {
-    assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
-
     val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
     val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu", 
conf.get(YARN_FPGA_DEVICE) -> "fpga")
     ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
@@ -516,7 +511,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
   }
 
   test("custom executor resource request yarn config and spark config fails") {
-    assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
     val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
     val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu", 
conf.get(YARN_FPGA_DEVICE) -> "fpga")
     ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
@@ -539,7 +533,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
 
 
   test("custom resources spark config mapped to yarn config") {
-    assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
     val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
     val yarnMadeupResource = "yarn.io/madeup"
     val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu",
@@ -562,8 +555,8 @@ class ClientSuite extends SparkFunSuite with Matchers {
       new YarnClientApplication(getNewApplicationResponse, appContext),
       containerLaunchContext)
 
-    val yarnRInfo = 
ResourceRequestTestHelper.getResources(newContext.getResource)
-    val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name -> 
rInfo.value)).toMap
+    val yarnRInfo = newContext.getResource.getResources
+    val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> 
rInfo.getValue)).toMap
     assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).nonEmpty)
     assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).get === 3)
     assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).nonEmpty)
@@ -573,7 +566,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
   }
 
   test("gpu/fpga spark resources mapped to custom yarn resources") {
-    assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
     val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
     val gpuCustomName = "custom/gpu"
     val fpgaCustomName = "custom/fpga"
@@ -595,8 +587,8 @@ class ClientSuite extends SparkFunSuite with Matchers {
       new YarnClientApplication(getNewApplicationResponse, appContext),
       containerLaunchContext)
 
-    val yarnRInfo = 
ResourceRequestTestHelper.getResources(newContext.getResource)
-    val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name -> 
rInfo.value)).toMap
+    val yarnRInfo = newContext.getResource.getResources
+    val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> 
rInfo.getValue)).toMap
     assert(allResourceInfo.get(gpuCustomName).nonEmpty)
     assert(allResourceInfo.get(gpuCustomName).get === 3)
     assert(allResourceInfo.get(fpgaCustomName).nonEmpty)
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
index 874cc08d6d6..575784bba34 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
@@ -24,7 +24,6 @@ import org.scalatest.matchers.should.Matchers._
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.deploy.yarn.ResourceRequestHelper._
-import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.config.{DRIVER_CORES, DRIVER_MEMORY, 
EXECUTOR_CORES, EXECUTOR_MEMORY}
 import org.apache.spark.resource.ResourceUtils.AMOUNT
@@ -96,24 +95,26 @@ class ResourceRequestHelperSuite extends SparkFunSuite with 
Matchers {
   }
 
   Seq(
-    "value with unit" -> Seq(ResourceInformation(CUSTOM_RES_1, 2, "G")),
-    "value without unit" -> Seq(ResourceInformation(CUSTOM_RES_1, 123, "")),
-    "multiple resources" -> Seq(ResourceInformation(CUSTOM_RES_1, 123, "m"),
-      ResourceInformation(CUSTOM_RES_2, 10, "G"))
+    "value with unit" -> Seq((CUSTOM_RES_1, 2, "G")),
+    "value without unit" -> Seq((CUSTOM_RES_1, 123, "")),
+    "multiple resources" -> Seq((CUSTOM_RES_1, 123, "m"), (CUSTOM_RES_2, 10, 
"G"))
   ).foreach { case (name, resources) =>
     test(s"valid request: $name") {
-      assume(isYarnResourceTypesAvailable())
-      val resourceDefs = resources.map { r => r.name }
-      val requests = resources.map { r => (r.name, r.value.toString + r.unit) 
}.toMap
+      val resourceDefs = resources.map { case (rName, _, _) => rName }
+      val requests = resources.map { case (rName, rValue, rUnit) =>
+        (rName, rValue.toString + rUnit)
+      }.toMap
 
       ResourceRequestTestHelper.initializeResourceTypes(resourceDefs)
 
       val resource = createResource()
       setResourceRequests(requests, resource)
 
-      resources.foreach { r =>
-        val requested = 
ResourceRequestTestHelper.getResourceInformationByName(resource, r.name)
-        assert(requested === r)
+      resources.foreach { case (rName, rValue, rUnit) =>
+        val requested = resource.getResourceInformation(rName)
+        assert(requested.getName === rName)
+        assert(requested.getValue === rValue)
+        assert(requested.getUnits === rUnit)
       }
     }
   }
@@ -124,7 +125,6 @@ class ResourceRequestHelperSuite extends SparkFunSuite with 
Matchers {
     ("invalid unit", CUSTOM_RES_1, "123ppp")
   ).foreach { case (name, key, value) =>
     test(s"invalid request: $name") {
-      assume(isYarnResourceTypesAvailable())
       ResourceRequestTestHelper.initializeResourceTypes(Seq(key))
 
       val resource = createResource()
@@ -147,7 +147,6 @@ class ResourceRequestHelperSuite extends SparkFunSuite with 
Matchers {
     NEW_CONFIG_DRIVER_CORES -> "1G"
   ).foreach { case (key, value) =>
     test(s"disallowed resource request: $key") {
-      assume(isYarnResourceTypesAvailable())
       val conf = new SparkConf(false).set(key, value)
       val thrown = intercept[SparkException] {
         validateResources(conf)
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
index 796e0990323..19c842f0928 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
@@ -19,17 +19,11 @@ package org.apache.spark.deploy.yarn
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.yarn.api.records.Resource
-
-import org.apache.spark.util.Utils
+import org.apache.hadoop.yarn.api.records.ResourceTypeInfo
+import org.apache.hadoop.yarn.util.resource.ResourceUtils
 
 object ResourceRequestTestHelper {
   def initializeResourceTypes(resourceTypes: Seq[String]): Unit = {
-    if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
-      throw new IllegalStateException("This method should not be invoked " +
-        "since YARN resource types is not available because of old Hadoop 
version!" )
-    }
-
     // ResourceUtils.reinitializeResources() is the YARN-way
     // to specify resources for the execution of the tests.
     // This method should receive standard resources with names of memory-mb 
and vcores.
@@ -37,64 +31,11 @@ object ResourceRequestTestHelper {
     // with different names e.g. memory, YARN would throw various exceptions
     // because it relies on that standard resources are always specified.
     val defaultResourceTypes = List(
-      createResourceTypeInfo("memory-mb"),
-      createResourceTypeInfo("vcores"))
-    val customResourceTypes = resourceTypes.map(createResourceTypeInfo)
+      ResourceTypeInfo.newInstance("memory-mb"),
+      ResourceTypeInfo.newInstance("vcores"))
+    val customResourceTypes = resourceTypes.map(ResourceTypeInfo.newInstance)
     val allResourceTypes = defaultResourceTypes ++ customResourceTypes
 
-    val resourceUtilsClass =
-      Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
-    val reinitializeResourcesMethod = 
resourceUtilsClass.getMethod("reinitializeResources",
-      classOf[java.util.List[AnyRef]])
-    reinitializeResourcesMethod.invoke(null, allResourceTypes.asJava)
-  }
-
-  private def createResourceTypeInfo(resourceName: String): AnyRef = {
-    val resTypeInfoClass = 
Utils.classForName("org.apache.hadoop.yarn.api.records.ResourceTypeInfo")
-    val resTypeInfoNewInstanceMethod = 
resTypeInfoClass.getMethod("newInstance", classOf[String])
-    resTypeInfoNewInstanceMethod.invoke(null, resourceName)
-  }
-
-  def getRequestedValue(res: Resource, rtype: String): AnyRef = {
-    val resourceInformation = getResourceInformation(res, rtype)
-    invokeMethod(resourceInformation, "getValue")
-  }
-
-  def getResourceInformationByName(res: Resource, nameParam: String): 
ResourceInformation = {
-    val resourceInformation: AnyRef = getResourceInformation(res, nameParam)
-    val name = invokeMethod(resourceInformation, 
"getName").asInstanceOf[String]
-    val value = invokeMethod(resourceInformation, 
"getValue").asInstanceOf[Long]
-    val units = invokeMethod(resourceInformation, 
"getUnits").asInstanceOf[String]
-    ResourceInformation(name, value, units)
+    ResourceUtils.reinitializeResources(allResourceTypes.asJava)
   }
-
-  private def getResourceInformation(res: Resource, name: String): AnyRef = {
-    if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
-      throw new IllegalStateException("assertResourceTypeValue() should not be 
invoked " +
-        "since yarn resource types is not available because of old Hadoop 
version!")
-    }
-
-    val getResourceInformationMethod = 
res.getClass.getMethod("getResourceInformation",
-      classOf[String])
-    val resourceInformation = getResourceInformationMethod.invoke(res, name)
-    resourceInformation
-  }
-
-  private def invokeMethod(resourceInformation: AnyRef, methodName: String): 
AnyRef = {
-    val getValueMethod = resourceInformation.getClass.getMethod(methodName)
-    getValueMethod.invoke(resourceInformation)
-  }
-
-  def getResources(res: Resource): Array[ResourceInformation] = {
-    val getResourceInformationMethod = res.getClass.getMethod("getResources")
-    val rInfoArray = 
getResourceInformationMethod.invoke(res).asInstanceOf[Array[AnyRef]]
-    rInfoArray.map { rInfo =>
-      val name = invokeMethod(rInfo, "getName").asInstanceOf[String]
-      val value = invokeMethod(rInfo, "getValue").asInstanceOf[Long]
-      val units = invokeMethod(rInfo, "getUnits").asInstanceOf[String]
-      ResourceInformation(name, value, units)
-    }
-  }
-
-  case class ResourceInformation(name: String, value: Long, unit: String)
 }
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 59b1e57aa5e..f2b4222b85e 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -188,7 +188,6 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers {
   }
 
   test("single container allocated with ResourceProfile") {
-    assume(isYarnResourceTypesAvailable())
     val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE))
     ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
     // create default profile so we get a different id to test below
@@ -223,7 +222,6 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers {
   }
 
   test("multiple containers allocated with ResourceProfiles") {
-    assume(isYarnResourceTypesAvailable())
     val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE), 
sparkConf.get(YARN_FPGA_DEVICE))
     ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
     // create default profile so we get a different id to test below
@@ -275,7 +273,6 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers {
   }
 
   test("custom resource requested from yarn") {
-    assume(isYarnResourceTypesAvailable())
     ResourceRequestTestHelper.initializeResourceTypes(List("gpu"))
 
     val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
@@ -299,7 +296,6 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers {
   }
 
   test("custom spark resource mapped to yarn resource configs") {
-    assume(isYarnResourceTypesAvailable())
     val yarnMadeupResource = "yarn.io/madeup"
     val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE), 
sparkConf.get(YARN_FPGA_DEVICE),
       yarnMadeupResource)
@@ -314,8 +310,8 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers {
 
     handler.updateResourceRequests()
     val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
-    val yarnRInfo = ResourceRequestTestHelper.getResources(defaultResource)
-    val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value) 
).toMap
+    val yarnRInfo = defaultResource.getResources
+    val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.getName -> 
rInfo.getValue) ).toMap
     assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).nonEmpty)
     assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).get === 3)
     assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).nonEmpty)
@@ -325,7 +321,6 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers {
   }
 
   test("gpu/fpga spark resource mapped to custom yarn resource") {
-    assume(isYarnResourceTypesAvailable())
     val gpuCustomName = "custom/gpu"
     val fpgaCustomName = "custom/fpga"
     val originalGpu = sparkConf.get(YARN_GPU_DEVICE)
@@ -343,8 +338,8 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers {
 
       handler.updateResourceRequests()
       val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
-      val yarnRInfo = ResourceRequestTestHelper.getResources(defaultResource)
-      val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name -> 
rInfo.value)).toMap
+      val yarnRInfo = defaultResource.getResources
+      val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> 
rInfo.getValue)).toMap
       assert(allResourceInfo.get(gpuCustomName).nonEmpty)
       assert(allResourceInfo.get(gpuCustomName).get === 3)
       assert(allResourceInfo.get(fpgaCustomName).nonEmpty)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to