Repository: spark
Updated Branches:
  refs/heads/master 0b9cae424 -> bcfee153b


[SPARK-12837][CORE] reduce network IO for accumulators

Sending un-updated accumulators back to driver makes no sense, as merging a 
zero value accumulator is a no-op. We should only send back updated 
accumulators, to save network IO.

new test in `TaskContextSuite`

Author: Wenchen Fan <wenc...@databricks.com>

Closes #12899 from cloud-fan/acc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bcfee153
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bcfee153
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bcfee153

Branch: refs/heads/master
Commit: bcfee153b1cacfe617e602f3b72c0877e0bdf1f7
Parents: 0b9cae4
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Tue May 10 11:16:31 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Tue May 10 11:16:56 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/TaskMetrics.scala |  2 +-
 .../scala/org/apache/spark/scheduler/Task.scala |  9 ++++-
 .../org/apache/spark/util/AccumulatorV2.scala   |  4 +--
 .../spark/scheduler/TaskContextSuite.scala      | 37 ++++++++++++++++----
 .../spark/sql/execution/metric/SQLMetrics.scala |  6 ++--
 .../spark/sql/execution/ui/SQLListener.scala    |  2 +-
 .../sql/execution/ui/SQLListenerSuite.scala     | 12 +++----
 7 files changed, 51 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bcfee153/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 7f4652c..1893167 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -218,7 +218,7 @@ class TaskMetrics private[spark] () extends Serializable {
   /**
    * External accumulators registered with this task.
    */
-  @transient private lazy val externalAccums = new 
ArrayBuffer[AccumulatorV2[_, _]]
+  @transient private[spark] lazy val externalAccums = new 
ArrayBuffer[AccumulatorV2[_, _]]
 
   private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
     externalAccums += a

http://git-wip-us.apache.org/repos/asf/spark/blob/bcfee153/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 95bcc7b..15f863b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -155,7 +155,14 @@ private[spark] abstract class Task[T](
    */
   def collectAccumulatorUpdates(taskFailed: Boolean = false): 
Seq[AccumulatorV2[_, _]] = {
     if (context != null) {
-      context.taskMetrics.accumulators().filter { a => !taskFailed || 
a.countFailedValues }
+      context.taskMetrics.internalAccums.filter { a =>
+        // RESULT_SIZE accumulator is always zero at executor, we need to send 
it back as its
+        // value will be updated at driver side.
+        // Note: internal accumulators representing task metrics always count 
failed values
+        !a.isZero || a.name == Some(InternalAccumulator.RESULT_SIZE)
+      // zero value external accumulators may still be useful, e.g. 
SQLMetrics, we should not filter
+      // them out.
+      } ++ context.taskMetrics.externalAccums.filter(a => !taskFailed || 
a.countFailedValues)
     } else {
       Seq.empty
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bcfee153/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index d8f380e..c487903 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -256,7 +256,7 @@ class LongAccumulator extends AccumulatorV2[jl.Long, 
jl.Long] {
    * Adds v to the accumulator, i.e. increment sum by v and count by 1.
    * @since 2.0.0
    */
-  override def isZero: Boolean = _count == 0L
+  override def isZero: Boolean = _sum == 0L && _count == 0
 
   override def copyAndReset(): LongAccumulator = new LongAccumulator
 
@@ -321,7 +321,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, 
jl.Double] {
   private[this] var _sum = 0.0
   private[this] var _count = 0L
 
-  override def isZero: Boolean = _count == 0L
+  override def isZero: Boolean = _sum == 0.0 && _count == 0
 
   override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bcfee153/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 9aca4db..368668b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -168,8 +168,10 @@ class TaskContextSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSpark
 
   test("failed tasks collect only accumulators whose values count during 
failures") {
     sc = new SparkContext("local", "test")
-    val acc1 = AccumulatorSuite.createLongAccum("x", true)
-    val acc2 = AccumulatorSuite.createLongAccum("y", false)
+    val acc1 = AccumulatorSuite.createLongAccum("x", false)
+    val acc2 = AccumulatorSuite.createLongAccum("y", true)
+    acc1.add(1)
+    acc2.add(1)
     // Create a dummy task. We won't end up running this; we just want to 
collect
     // accumulator updates from it.
     val taskMetrics = TaskMetrics.empty
@@ -185,12 +187,33 @@ class TaskContextSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSpark
     }
     // First, simulate task success. This should give us all the accumulators.
     val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false)
-    val accumUpdates2 = taskMetrics.internalAccums ++ Seq(acc1, acc2)
-    TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2)
+    TaskMetricsSuite.assertUpdatesEquals(accumUpdates1.takeRight(2), Seq(acc1, 
acc2))
     // Now, simulate task failures. This should give us only the accums that 
count failed values.
-    val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true)
-    val accumUpdates4 = taskMetrics.internalAccums ++ Seq(acc1)
-    TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4)
+    val accumUpdates2 = task.collectAccumulatorUpdates(taskFailed = true)
+    TaskMetricsSuite.assertUpdatesEquals(accumUpdates2.takeRight(1), Seq(acc2))
+  }
+
+  test("only updated internal accumulators will be sent back to driver") {
+    sc = new SparkContext("local", "test")
+    // Create a dummy task. We won't end up running this; we just want to 
collect
+    // accumulator updates from it.
+    val taskMetrics = TaskMetrics.empty
+    val task = new Task[Int](0, 0, 0) {
+      context = new TaskContextImpl(0, 0, 0L, 0,
+        new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
+        new Properties,
+        SparkEnv.get.metricsSystem,
+        taskMetrics)
+      taskMetrics.incMemoryBytesSpilled(10)
+      override def runTask(tc: TaskContext): Int = 0
+    }
+    val updatedAccums = task.collectAccumulatorUpdates()
+    assert(updatedAccums.length == 2)
+    // the RESULT_SIZE accumulator will be sent back anyway.
+    assert(updatedAccums(0).name == Some(InternalAccumulator.RESULT_SIZE))
+    assert(updatedAccums(0).value == 0)
+    assert(updatedAccums(1).name == 
Some(InternalAccumulator.MEMORY_BYTES_SPILLED))
+    assert(updatedAccums(1).value == 10)
   }
 
   test("localProperties are propagated to executors correctly") {

http://git-wip-us.apache.org/repos/asf/spark/blob/bcfee153/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index f82e0b8..7861104 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -66,7 +66,7 @@ private[sql] object SQLMetrics {
 
   def createMetric(sc: SparkContext, name: String): SQLMetric = {
     val acc = new SQLMetric(SUM_METRIC)
-    acc.register(sc, name = Some(name), countFailedValues = true)
+    acc.register(sc, name = Some(name), countFailedValues = false)
     acc
   }
 
@@ -79,7 +79,7 @@ private[sql] object SQLMetrics {
     // data size total (min, med, max):
     // 100GB (100MB, 1GB, 10GB)
     val acc = new SQLMetric(SIZE_METRIC, -1)
-    acc.register(sc, name = Some(s"$name total (min, med, max)"), 
countFailedValues = true)
+    acc.register(sc, name = Some(s"$name total (min, med, max)"), 
countFailedValues = false)
     acc
   }
 
@@ -88,7 +88,7 @@ private[sql] object SQLMetrics {
     // duration(min, med, max):
     // 5s (800ms, 1s, 2s)
     val acc = new SQLMetric(TIMING_METRIC, -1)
-    acc.register(sc, name = Some(s"$name total (min, med, max)"), 
countFailedValues = true)
+    acc.register(sc, name = Some(s"$name total (min, med, max)"), 
countFailedValues = false)
     acc
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bcfee153/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 29c5411..510a2ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -164,7 +164,7 @@ private[sql] class SQLListener(conf: SparkConf) extends 
SparkListener with Loggi
         taskEnd.taskInfo.taskId,
         taskEnd.stageId,
         taskEnd.stageAttemptId,
-        taskEnd.taskMetrics.accumulators().map(a => a.toInfo(Some(a.value), 
None)),
+        taskEnd.taskMetrics.externalAccums.map(a => a.toInfo(Some(a.value), 
None)),
         finishTask = true)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bcfee153/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 5e08658..67e4484 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.ui
 
 import java.util.Properties
 
-import org.mockito.Mockito.{mock, when}
+import org.mockito.Mockito.mock
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
@@ -74,13 +74,13 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
   )
 
   private def createTaskMetrics(accumulatorUpdates: Map[Long, Long]): 
TaskMetrics = {
-    val metrics = mock(classOf[TaskMetrics])
-    when(metrics.accumulators()).thenReturn(accumulatorUpdates.map { case (id, 
update) =>
+    val metrics = TaskMetrics.empty
+    accumulatorUpdates.foreach { case (id, update) =>
       val acc = new LongAccumulator
       acc.metadata = AccumulatorMetadata(id, Some(""), true)
-      acc.setValue(update)
-      acc
-    }.toSeq)
+      acc.add(update)
+      metrics.registerAccumulator(acc)
+    }
     metrics
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to