[
https://issues.apache.org/jira/browse/SPARK-13316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196758#comment-15196758
]
Apache Spark commented on SPARK-13316:
--------------------------------------
User 'mwws' has created a pull request for this issue:
https://github.com/apache/spark/pull/11753
> "SparkException: DStream has not been initialized" when restoring
> StreamingContext from checkpoint and the dstream is created afterwards
> ----------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-13316
> URL: https://issues.apache.org/jira/browse/SPARK-13316
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Reporter: Jacek Laskowski
> Priority: Minor
>
> I faced the issue today but [it was already reported on
> SO|http://stackoverflow.com/q/35090180/1305344] a couple of days ago and the
> reason is that a dstream is registered after a StreamingContext has been
> recreated from checkpoint.
> It _appears_ that...no dstreams must be registered after a StreamingContext
> has been recreated from checkpoint. It is *not* obvious at first.
> The code:
> {code}
> def createStreamingContext(): StreamingContext = {
> val ssc = new StreamingContext(sparkConf, Duration(1000))
> ssc.checkpoint(checkpointDir)
> ssc
> }
> val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
> val socketStream = ssc.socketTextStream(...)
> socketStream.checkpoint(Seconds(1))
> socketStream.foreachRDD(...)
> {code}
> It should be described in docs at the very least and/or checked in the code
> when the streaming computation starts.
> The exception is as follows:
> {code}
> org.apache.spark.SparkException:
> org.apache.spark.streaming.dstream.ConstantInputDStream@724797ab has not been
> initialized
> at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:311)
> at
> org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:89)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332)
> at scala.Option.orElse(Option.scala:289)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:329)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:233)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:228)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:228)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:97)
> at
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:589)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585)
> at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
> at
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:585)
> at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:579)
> ... 43 elided
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]