Repository: spark
Updated Branches:
  refs/heads/master 70d495dce -> 4cb49412d


[SPARK-18836][CORE] Serialize one copy of task metrics in DAGScheduler

## What changes were proposed in this pull request?

Right now we serialize the empty task metrics once per task – Since this is 
shared across all tasks we could use the same serialized task metrics across 
all tasks of a stage.

## How was this patch tested?

- [x] Run tests on EC2 to measure performance improvement

Author: Shivaram Venkataraman <[email protected]>

Closes #16261 from shivaram/task-metrics-one-copy.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cb49412
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cb49412
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cb49412

Branch: refs/heads/master
Commit: 4cb49412d1d7d10ffcc738475928c7de2bc59fd4
Parents: 70d495d
Author: Shivaram Venkataraman <[email protected]>
Authored: Mon Dec 19 14:53:01 2016 -0800
Committer: Kay Ousterhout <[email protected]>
Committed: Mon Dec 19 14:53:01 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/DAGScheduler.scala  |  5 +++--
 .../scala/org/apache/spark/scheduler/ResultTask.scala    |  9 +++++----
 .../org/apache/spark/scheduler/ShuffleMapTask.scala      | 11 ++++++-----
 .../src/main/scala/org/apache/spark/scheduler/Task.scala | 10 ++++++++--
 .../scala/org/apache/spark/executor/ExecutorSuite.scala  |  4 +++-
 .../test/scala/org/apache/spark/scheduler/FakeTask.scala | 10 +++++++++-
 .../org/apache/spark/scheduler/TaskContextSuite.scala    |  6 ++++--
 7 files changed, 38 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 0a1c500..6177baf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1009,13 +1009,14 @@ class DAGScheduler(
     }
 
     val tasks: Seq[Task[_]] = try {
+      val serializedTaskMetrics = 
closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
       stage match {
         case stage: ShuffleMapStage =>
           partitionsToCompute.map { id =>
             val locs = taskIdToLocations(id)
             val part = stage.rdd.partitions(id)
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
-              taskBinary, part, locs, stage.latestInfo.taskMetrics, 
properties, Option(jobId),
+              taskBinary, part, locs, properties, serializedTaskMetrics, 
Option(jobId),
               Option(sc.applicationId), sc.applicationAttemptId)
           }
 
@@ -1025,7 +1026,7 @@ class DAGScheduler(
             val part = stage.rdd.partitions(p)
             val locs = taskIdToLocations(id)
             new ResultTask(stage.id, stage.latestInfo.attemptId,
-              taskBinary, part, locs, id, properties, 
stage.latestInfo.taskMetrics,
+              taskBinary, part, locs, id, properties, serializedTaskMetrics,
               Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
           }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index d19353f..6abdf0f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -42,7 +42,8 @@ import org.apache.spark.rdd.RDD
  * @param outputId index of the task in this job (a job can launch tasks on 
only a subset of the
  *                 input RDD's partitions).
  * @param localProperties copy of thread-local properties set by the user on 
the driver side.
- * @param metrics a `TaskMetrics` that is created at driver side and sent to 
executor side.
+ * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized 
on the driver side
+ *                              and sent to executor side.
  *
  * The parameters below are optional:
  * @param jobId id of the job this task belongs to
@@ -57,12 +58,12 @@ private[spark] class ResultTask[T, U](
     locs: Seq[TaskLocation],
     val outputId: Int,
     localProperties: Properties,
-    metrics: TaskMetrics,
+    serializedTaskMetrics: Array[Byte],
     jobId: Option[Int] = None,
     appId: Option[String] = None,
     appAttemptId: Option[String] = None)
-  extends Task[U](stageId, stageAttemptId, partition.index, metrics, 
localProperties, jobId,
-    appId, appAttemptId)
+  extends Task[U](stageId, stageAttemptId, partition.index, localProperties, 
serializedTaskMetrics,
+    jobId, appId, appAttemptId)
   with Serializable {
 
   @transient private[this] val preferredLocs: Seq[TaskLocation] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 31011de..994b81e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -42,8 +42,9 @@ import org.apache.spark.shuffle.ShuffleWriter
  *                   the type should be (RDD[_], ShuffleDependency[_, _, _]).
  * @param partition partition of the RDD this task is associated with
  * @param locs preferred task execution locations for locality scheduling
- * @param metrics a `TaskMetrics` that is created at driver side and sent to 
executor side.
  * @param localProperties copy of thread-local properties set by the user on 
the driver side.
+ * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized 
on the driver side
+ *                              and sent to executor side.
  *
  * The parameters below are optional:
  * @param jobId id of the job this task belongs to
@@ -56,18 +57,18 @@ private[spark] class ShuffleMapTask(
     taskBinary: Broadcast[Array[Byte]],
     partition: Partition,
     @transient private var locs: Seq[TaskLocation],
-    metrics: TaskMetrics,
     localProperties: Properties,
+    serializedTaskMetrics: Array[Byte],
     jobId: Option[Int] = None,
     appId: Option[String] = None,
     appAttemptId: Option[String] = None)
-  extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, 
localProperties, jobId,
-    appId, appAttemptId)
+  extends Task[MapStatus](stageId, stageAttemptId, partition.index, 
localProperties,
+    serializedTaskMetrics, jobId, appId, appAttemptId)
   with Logging {
 
   /** A constructor used only in test suites. This does not require passing in 
an RDD. */
   def this(partitionId: Int) {
-    this(0, 0, null, new Partition { override def index: Int = 0 }, null, 
null, new Properties)
+    this(0, 0, null, new Partition { override def index: Int = 0 }, null, new 
Properties, null)
   }
 
   @transient private val preferredLocs: Seq[TaskLocation] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 1554200..5becca6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -48,6 +48,8 @@ import org.apache.spark.util._
  * @param partitionId index of the number in the RDD
  * @param metrics a `TaskMetrics` that is created at driver side and sent to 
executor side.
  * @param localProperties copy of thread-local properties set by the user on 
the driver side.
+ * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized 
on the driver side
+ *                              and sent to executor side.
  *
  * The parameters below are optional:
  * @param jobId id of the job this task belongs to
@@ -58,13 +60,17 @@ private[spark] abstract class Task[T](
     val stageId: Int,
     val stageAttemptId: Int,
     val partitionId: Int,
-    // The default value is only used in tests.
-    val metrics: TaskMetrics = TaskMetrics.registered,
     @transient var localProperties: Properties = new Properties,
+    // The default value is only used in tests.
+    serializedTaskMetrics: Array[Byte] =
+      
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(),
     val jobId: Option[Int] = None,
     val appId: Option[String] = None,
     val appAttemptId: Option[String] = None) extends Serializable {
 
+  @transient lazy val metrics: TaskMetrics =
+    
SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics))
+
   /**
    * Called by [[org.apache.spark.executor.Executor]] to run this task.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 683eeee..742500d 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -51,9 +51,11 @@ class ExecutorSuite extends SparkFunSuite {
     when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem)
     when(mockEnv.memoryManager).thenReturn(mockMemoryManager)
     when(mockEnv.closureSerializer).thenReturn(serializer)
+    val fakeTaskMetrics = 
serializer.newInstance().serialize(TaskMetrics.registered).array()
+
     val serializedTask =
       Task.serializeWithDependencies(
-        new FakeTask(0, 0),
+        new FakeTask(0, 0, Nil, fakeTaskMetrics),
         HashMap[String, Long](),
         HashMap[String, Long](),
         serializer.newInstance())

http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala 
b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index a757041..fe6de2b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -17,12 +17,20 @@
 
 package org.apache.spark.scheduler
 
+import java.util.Properties
+
+import org.apache.spark.SparkEnv
 import org.apache.spark.TaskContext
+import org.apache.spark.executor.TaskMetrics
 
 class FakeTask(
     stageId: Int,
     partitionId: Int,
-    prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, 
partitionId) {
+    prefLocs: Seq[TaskLocation] = Nil,
+    serializedTaskMetrics: Array[Byte] =
+      
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array())
+  extends Task[Int](stageId, 0, partitionId, new Properties, 
serializedTaskMetrics) {
+
   override def runTask(context: TaskContext): Int = 0
   override def preferredLocations: Seq[TaskLocation] = prefLocs
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 9eda79a..7004128 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -62,7 +62,8 @@ class TaskContextSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSpark
     val func = (c: TaskContext, i: Iterator[String]) => i.next()
     val taskBinary = 
sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func))))
     val task = new ResultTask[String, String](
-      0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new 
TaskMetrics)
+      0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties,
+      closureSerializer.serialize(TaskMetrics.registered).array())
     intercept[RuntimeException] {
       task.run(0, 0, null)
     }
@@ -83,7 +84,8 @@ class TaskContextSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSpark
     val func = (c: TaskContext, i: Iterator[String]) => i.next()
     val taskBinary = 
sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func))))
     val task = new ResultTask[String, String](
-      0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new 
TaskMetrics)
+      0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties,
+      closureSerializer.serialize(TaskMetrics.registered).array())
     intercept[RuntimeException] {
       task.run(0, 0, null)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to