Repository: spark Updated Branches: refs/heads/master b33d6b728 -> b056e8cb0
[SPARK-15010][CORE] new accumulator shoule be tolerant of local RPC message delivery ## What changes were proposed in this pull request? The RPC framework will not serialize and deserialize messages in local mode, we should not call `acc.value` when receive heartbeat message, because the serialization hook of new accumulator may not be triggered and the `atDriverSide` flag may not be set. ## How was this patch tested? tested it locally via spark shell Author: Wenchen Fan <[email protected]> Closes #12795 from cloud-fan/bug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b056e8cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b056e8cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b056e8cb Branch: refs/heads/master Commit: b056e8cb0a7c58c3e4d199af3ee13be50305b747 Parents: b33d6b7 Author: Wenchen Fan <[email protected]> Authored: Fri Apr 29 19:01:38 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Fri Apr 29 19:01:38 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b056e8cb/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 776a322..8fa4aa1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -389,9 +389,14 @@ private[spark] class TaskSchedulerImpl( // (taskId, stageId, stageAttemptId, accumUpdates) val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { accumUpdates.flatMap { case (id, updates) => + // We should call `acc.value` here as we are at driver side now. However, the RPC framework + // optimizes local message delivery so that messages do not need to de serialized and + // deserialized. This brings trouble to the accumulator framework, which depends on + // serialization to set the `atDriverSide` flag. Here we call `acc.localValue` instead to + // be more robust about this issue. + val accInfos = updates.map(acc => acc.toInfo(Some(acc.localValue), None)) taskIdToTaskSetManager.get(id).map { taskSetMgr => - (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, - updates.map(acc => acc.toInfo(Some(acc.value), None))) + (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
