[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14277552#comment-14277552 ]
vincent ye commented on SPARK-5206: ----------------------------------- Hi Tathagata, Accumulator object is created after the StreamingContext (ssc) created using: val counter = ssc.sparkContext.accumulator() The way I create the recoverable ssc is like this: val ssc = StreamingContext.getOrCreate("/spark/applicationName", functionToCreateContext()) def functionToCreateContext = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) val counter = ssc.sparkContext.accumulator(0L, "message received") ..... dstream.foreach(counter += 1) ...... ssc } ssc.start ssc.awaitTermination() If the app is recovering from checkpoint, It won't execute functionToCreateContext. Then the counter of Accumulator won't be instantiated and registered to Accumulators singleton object. But the counter inside dstream.foreach(counter += 1) will be created by deserializer on a remote worker which won't register the counter to the driver. I don't understand what you mean explicity referencing the Accumulator object ...... Thanks > Accumulators are not re-registered during recovering from checkpoint > -------------------------------------------------------------------- > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.1.0 > Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org