Repository: spark
Updated Branches:
  refs/heads/master b33d6b728 -> b056e8cb0


[SPARK-15010][CORE] new accumulator shoule be tolerant of local RPC message 
delivery

## What changes were proposed in this pull request?

The RPC framework will not serialize and deserialize messages in local mode, we 
should not call `acc.value` when receive heartbeat message, because the 
serialization hook of new accumulator may not be triggered and the 
`atDriverSide` flag may not be set.

## How was this patch tested?

tested it locally via spark shell

Author: Wenchen Fan <[email protected]>

Closes #12795 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b056e8cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b056e8cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b056e8cb

Branch: refs/heads/master
Commit: b056e8cb0a7c58c3e4d199af3ee13be50305b747
Parents: b33d6b7
Author: Wenchen Fan <[email protected]>
Authored: Fri Apr 29 19:01:38 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Fri Apr 29 19:01:38 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/TaskSchedulerImpl.scala      | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b056e8cb/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 776a322..8fa4aa1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -389,9 +389,14 @@ private[spark] class TaskSchedulerImpl(
     // (taskId, stageId, stageAttemptId, accumUpdates)
     val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] 
= synchronized {
       accumUpdates.flatMap { case (id, updates) =>
+        // We should call `acc.value` here as we are at driver side now.  
However, the RPC framework
+        // optimizes local message delivery so that messages do not need to de 
serialized and
+        // deserialized.  This brings trouble to the accumulator framework, 
which depends on
+        // serialization to set the `atDriverSide` flag.  Here we call 
`acc.localValue` instead to
+        // be more robust about this issue.
+        val accInfos = updates.map(acc => acc.toInfo(Some(acc.localValue), 
None))
         taskIdToTaskSetManager.get(id).map { taskSetMgr =>
-          (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId,
-            updates.map(acc => acc.toInfo(Some(acc.value), None)))
+          (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to