Repository: spark Updated Branches: refs/heads/master 70d495dce -> 4cb49412d
[SPARK-18836][CORE] Serialize one copy of task metrics in DAGScheduler ## What changes were proposed in this pull request? Right now we serialize the empty task metrics once per task â Since this is shared across all tasks we could use the same serialized task metrics across all tasks of a stage. ## How was this patch tested? - [x] Run tests on EC2 to measure performance improvement Author: Shivaram Venkataraman <[email protected]> Closes #16261 from shivaram/task-metrics-one-copy. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cb49412 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cb49412 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cb49412 Branch: refs/heads/master Commit: 4cb49412d1d7d10ffcc738475928c7de2bc59fd4 Parents: 70d495d Author: Shivaram Venkataraman <[email protected]> Authored: Mon Dec 19 14:53:01 2016 -0800 Committer: Kay Ousterhout <[email protected]> Committed: Mon Dec 19 14:53:01 2016 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 +++-- .../scala/org/apache/spark/scheduler/ResultTask.scala | 9 +++++---- .../org/apache/spark/scheduler/ShuffleMapTask.scala | 11 ++++++----- .../src/main/scala/org/apache/spark/scheduler/Task.scala | 10 ++++++++-- .../scala/org/apache/spark/executor/ExecutorSuite.scala | 4 +++- .../test/scala/org/apache/spark/scheduler/FakeTask.scala | 10 +++++++++- .../org/apache/spark/scheduler/TaskContextSuite.scala | 6 ++++-- 7 files changed, 38 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0a1c500..6177baf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1009,13 +1009,14 @@ class DAGScheduler( } val tasks: Seq[Task[_]] = try { + val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId), + taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } @@ -1025,7 +1026,7 @@ class DAGScheduler( val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics, + taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index d19353f..6abdf0f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -42,7 +42,8 @@ import org.apache.spark.rdd.RDD * @param outputId index of the task in this job (a job can launch tasks on only a subset of the * input RDD's partitions). * @param localProperties copy of thread-local properties set by the user on the driver side. - * @param metrics a `TaskMetrics` that is created at driver side and sent to executor side. + * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side + * and sent to executor side. * * The parameters below are optional: * @param jobId id of the job this task belongs to @@ -57,12 +58,12 @@ private[spark] class ResultTask[T, U]( locs: Seq[TaskLocation], val outputId: Int, localProperties: Properties, - metrics: TaskMetrics, + serializedTaskMetrics: Array[Byte], jobId: Option[Int] = None, appId: Option[String] = None, appAttemptId: Option[String] = None) - extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId, - appId, appAttemptId) + extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics, + jobId, appId, appAttemptId) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 31011de..994b81e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -42,8 +42,9 @@ import org.apache.spark.shuffle.ShuffleWriter * the type should be (RDD[_], ShuffleDependency[_, _, _]). * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling - * @param metrics a `TaskMetrics` that is created at driver side and sent to executor side. * @param localProperties copy of thread-local properties set by the user on the driver side. + * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side + * and sent to executor side. * * The parameters below are optional: * @param jobId id of the job this task belongs to @@ -56,18 +57,18 @@ private[spark] class ShuffleMapTask( taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient private var locs: Seq[TaskLocation], - metrics: TaskMetrics, localProperties: Properties, + serializedTaskMetrics: Array[Byte], jobId: Option[Int] = None, appId: Option[String] = None, appAttemptId: Option[String] = None) - extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId, - appId, appAttemptId) + extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties, + serializedTaskMetrics, jobId, appId, appAttemptId) with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ def this(partitionId: Int) { - this(0, 0, null, new Partition { override def index: Int = 0 }, null, null, new Properties) + this(0, 0, null, new Partition { override def index: Int = 0 }, null, new Properties, null) } @transient private val preferredLocs: Seq[TaskLocation] = { http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/main/scala/org/apache/spark/scheduler/Task.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 1554200..5becca6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -48,6 +48,8 @@ import org.apache.spark.util._ * @param partitionId index of the number in the RDD * @param metrics a `TaskMetrics` that is created at driver side and sent to executor side. * @param localProperties copy of thread-local properties set by the user on the driver side. + * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side + * and sent to executor side. * * The parameters below are optional: * @param jobId id of the job this task belongs to @@ -58,13 +60,17 @@ private[spark] abstract class Task[T]( val stageId: Int, val stageAttemptId: Int, val partitionId: Int, - // The default value is only used in tests. - val metrics: TaskMetrics = TaskMetrics.registered, @transient var localProperties: Properties = new Properties, + // The default value is only used in tests. + serializedTaskMetrics: Array[Byte] = + SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(), val jobId: Option[Int] = None, val appId: Option[String] = None, val appAttemptId: Option[String] = None) extends Serializable { + @transient lazy val metrics: TaskMetrics = + SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics)) + /** * Called by [[org.apache.spark.executor.Executor]] to run this task. * http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 683eeee..742500d 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -51,9 +51,11 @@ class ExecutorSuite extends SparkFunSuite { when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem) when(mockEnv.memoryManager).thenReturn(mockMemoryManager) when(mockEnv.closureSerializer).thenReturn(serializer) + val fakeTaskMetrics = serializer.newInstance().serialize(TaskMetrics.registered).array() + val serializedTask = Task.serializeWithDependencies( - new FakeTask(0, 0), + new FakeTask(0, 0, Nil, fakeTaskMetrics), HashMap[String, Long](), HashMap[String, Long](), serializer.newInstance()) http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index a757041..fe6de2b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -17,12 +17,20 @@ package org.apache.spark.scheduler +import java.util.Properties + +import org.apache.spark.SparkEnv import org.apache.spark.TaskContext +import org.apache.spark.executor.TaskMetrics class FakeTask( stageId: Int, partitionId: Int, - prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, partitionId) { + prefLocs: Seq[TaskLocation] = Nil, + serializedTaskMetrics: Array[Byte] = + SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array()) + extends Task[Int](stageId, 0, partitionId, new Properties, serializedTaskMetrics) { + override def runTask(context: TaskContext): Int = 0 override def preferredLocations: Seq[TaskLocation] = prefLocs } http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 9eda79a..7004128 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -62,7 +62,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val func = (c: TaskContext, i: Iterator[String]) => i.next() val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func)))) val task = new ResultTask[String, String]( - 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new TaskMetrics) + 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, + closureSerializer.serialize(TaskMetrics.registered).array()) intercept[RuntimeException] { task.run(0, 0, null) } @@ -83,7 +84,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val func = (c: TaskContext, i: Iterator[String]) => i.next() val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func)))) val task = new ResultTask[String, String]( - 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new TaskMetrics) + 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, + closureSerializer.serialize(TaskMetrics.registered).array()) intercept[RuntimeException] { task.run(0, 0, null) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
