[
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559071#comment-14559071
]
Adrian Tanase commented on SPARK-5206:
--------------------------------------
[~tdas] - are there any plans to make accumulators re-register automatically at
checkpoint restore? I think this would be a valuable fix even if the values are
reset to "zero".
For example, we're using them to aggregate some counters that are pushed to
OpenTSDB after every micro-batch, after which they are re-set to the initial
value. (to avoid rated metrics in OpenTSDB and keep the counters additive)
I implemented the above suggestion and it works - but it also has a number of
downsides that make it impractical for a codebase the relies heavily on metrics
and counters implemented as accumulators:
- it seems to limit usage to either output operations or transformed
{{DStreams}}
- if prevents using accumulators during transformations (e.g. simple like
{{map}} or stateful like {{updateStateByKey}})
- one has to wrap all the DStream transformations in {{foreachRDD}} or
{{transform}}, altering the semantics of the streaming job heavily and
obscuring the business logic - something that goes against Spark's mantra for
ease of use
Is there another way that I'm missing in the above pattern, that would simplify
the implementation by only calling {{getInstance}} once per streaming context?
Thanks!
> Accumulators are not re-registered during recovering from checkpoint
> --------------------------------------------------------------------
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.1.0
> Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41,
> 4)
> java.util.NoSuchElementException: key not found: 1
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:58)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is
> recovered from checkpoint. It won't be executed in the driver. So when the
> driver process it at
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
> It can't find the Accumulator because it's not re-register during the
> recovery.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]