[
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14562492#comment-14562492
]
Tathagata Das commented on SPARK-5206:
--------------------------------------
Good observations on the limitations. Currently, there are no immediate
concrete plans to fix this, but I welcome suggestions. The tricky things is
figuring out the right semantics. Your suggestion of reseting the value to zero
may suffice your purpose where the value is reset after every batch. However
for other purposes, such resetting will lead to non-intuitive behavior in case
of failure that hides the problem (accumulator suddenly resetting to zero) but
implicitly resetting and not explicitly failing (as it does now). So I am not
convinced that automatically resetting to zero is a good solution. But I am
open hear everyone's opinions in this matter.
> Accumulators are not re-registered during recovering from checkpoint
> --------------------------------------------------------------------
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.1.0
> Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41,
> 4)
> java.util.NoSuchElementException: key not found: 1
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:58)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is
> recovered from checkpoint. It won't be executed in the driver. So when the
> driver process it at
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
> It can't find the Accumulator because it's not re-register during the
> recovery.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]