This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a42d894  [SPARK-29417][CORE] Resource Scheduling - add 
TaskContext.resource java api
a42d894 is described below

commit a42d894a4090c97a90ce23b0989163909ebf548d
Author: Thomas Graves <[email protected]>
AuthorDate: Mon Oct 14 13:27:34 2019 -0700

    [SPARK-29417][CORE] Resource Scheduling - add TaskContext.resource java api
    
    ### What changes were proposed in this pull request?
    We added a TaskContext.resources() api, but I realized this is returning a 
scala Map which is not ideal for access from Java.  Here I add a resourcesJMap 
function which returns a java.util.Map to make it easily accessible from Java.
    
    ### Why are the changes needed?
    Java API access
    
    ### Does this PR introduce any user-facing change?
    <!--
    If yes, please clarify the previous behavior and the change this PR 
proposes - provide the console output, description and/or an example to show 
the behavior difference if possible.
    If no, write 'No'.
    -->
    Yes, new TaskContext function to access from Java
    
    ### How was this patch tested?
    <!--
    If tests were added, say they were added here. Please make sure to add some 
test cases that check the changes thoroughly including negative and positive 
cases if possible.
    If it was tested in a way different from regular unit tests, please clarify 
how you tested step by step, ideally copy and paste-able, so that other 
reviewers can test and check, and descendants can verify in the future.
    If tests were not added, please describe why they were not added and/or why 
it was difficult to add.
    -->
    new unit test
    
    Closes #26083 from tgravescs/SPARK-29417.
    
    Lead-authored-by: Thomas Graves 
<[email protected]>
    Co-authored-by: Thomas Graves <[email protected]>
    Co-authored-by: Thomas Graves <[email protected]>
    Signed-off-by: Xiangrui Meng <[email protected]>
---
 core/src/main/scala/org/apache/spark/BarrierTaskContext.scala     | 5 +++++
 core/src/main/scala/org/apache/spark/TaskContext.scala            | 8 ++++++++
 core/src/main/scala/org/apache/spark/TaskContextImpl.scala        | 5 +++++
 .../java/test/org/apache/spark/JavaTaskContextCompileCheck.java   | 5 +++++
 project/MimaExcludes.scala                                        | 3 +++
 5 files changed, 26 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 5afd8a5..3d36980 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import java.util.{Properties, Timer, TimerTask}
 
+import scala.collection.JavaConverters._
 import scala.concurrent.TimeoutException
 import scala.concurrent.duration._
 
@@ -211,6 +212,10 @@ class BarrierTaskContext private[spark] (
 
   override def resources(): Map[String, ResourceInformation] = 
taskContext.resources()
 
+  override def resourcesJMap(): java.util.Map[String, ResourceInformation] = {
+    resources().asJava
+  }
+
   override private[spark] def killTaskIfInterrupted(): Unit = 
taskContext.killTaskIfInterrupted()
 
   override private[spark] def getKillReason(): Option[String] = 
taskContext.getKillReason()
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 2299c54..fd41fac 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -185,6 +185,14 @@ abstract class TaskContext extends Serializable {
   @Evolving
   def resources(): Map[String, ResourceInformation]
 
+  /**
+   * (java-specific) Resources allocated to the task. The key is the resource 
name and the value
+   * is information about the resource. Please refer to
+   * [[org.apache.spark.resource.ResourceInformation]] for specifics.
+   */
+  @Evolving
+  def resourcesJMap(): java.util.Map[String, ResourceInformation]
+
   @DeveloperApi
   def taskMetrics(): TaskMetrics
 
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala 
b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 516fb95..08a58a0 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import java.util.Properties
 import javax.annotation.concurrent.GuardedBy
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.executor.TaskMetrics
@@ -101,6 +102,10 @@ private[spark] class TaskContextImpl(
     this
   }
 
+  override def resourcesJMap(): java.util.Map[String, ResourceInformation] = {
+    resources.asJava
+  }
+
   @GuardedBy("this")
   private[spark] override def markTaskFailed(error: Throwable): Unit = 
synchronized {
     if (failed) return
diff --git 
a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java 
b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
index 62a0b85..5ce7937 100644
--- a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
+++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
@@ -17,7 +17,10 @@
 
 package test.org.apache.spark;
 
+import java.util.Map;
+
 import org.apache.spark.TaskContext;
+import org.apache.spark.resource.ResourceInformation;
 import org.apache.spark.util.TaskCompletionListener;
 import org.apache.spark.util.TaskFailureListener;
 
@@ -40,7 +43,9 @@ public class JavaTaskContextCompileCheck {
     tc.stageId();
     tc.stageAttemptNumber();
     tc.taskAttemptId();
+    // this returns a scala Map, so make sure the JMap version give a java 
type back
     tc.resources();
+    Map<String, ResourceInformation> resources = tc.resourcesJMap();
     tc.taskMetrics();
     tc.taskMemoryManager();
     tc.getLocalProperties();
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index bc607b5..53a5a50 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -59,6 +59,9 @@ object MimaExcludes {
     // [SPARK-27366][CORE] Support GPU Resources in Spark job scheduling
     
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.resources"),
 
+    // [SPARK-29417][CORE] Resource Scheduling - add TaskContext.resource java 
api
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.resourcesJMap"),
+
     // [SPARK-27410][MLLIB] Remove deprecated / no-op mllib.KMeans getRuns, 
setRuns
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.KMeans.getRuns"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.KMeans.setRuns"),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to