[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559071#comment-14559071
 ] 

Adrian Tanase commented on SPARK-5206:
--------------------------------------

[~tdas] - are there any plans to make accumulators re-register automatically at 
checkpoint restore? I think this would be a valuable fix even if the values are 
reset to "zero".
For example, we're using them to aggregate some counters that are pushed to 
OpenTSDB after every micro-batch, after which they are re-set to the initial 
value. (to avoid rated metrics in OpenTSDB and keep the counters additive)

I implemented the above suggestion and it works - but it also has a number of 
downsides that make it impractical for a codebase the relies heavily on metrics 
and counters implemented as accumulators:
- it seems to limit usage to either output operations or transformed 
{{DStreams}}
- if prevents using accumulators during transformations (e.g. simple like 
{{map}} or stateful like {{updateStateByKey}})
- one has to wrap all the DStream transformations in {{foreachRDD}} or 
{{transform}}, altering the semantics of the streaming job heavily and 
obscuring the business logic - something that goes against Spark's mantra for 
ease of use

Is there another way that I'm missing in the above pattern, that would simplify 
the implementation by only calling {{getInstance}} once per streaming context?

Thanks!

> Accumulators are not re-registered during recovering from checkpoint
> --------------------------------------------------------------------
>
>                 Key: SPARK-5206
>                 URL: https://issues.apache.org/jira/browse/SPARK-5206
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.1.0
>            Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>       at scala.collection.MapLike$class.default(MapLike.scala:228)
>       at scala.collection.AbstractMap.default(Map.scala:58)
>       at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>       at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>       at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to