[ https://issues.apache.org/jira/browse/SPARK-13316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-13316. ---------------------------------- Resolution: Not A Problem I tried to reproduce this as below: {code} nc -lk 9999 {code} {code} val checkpointDir = Utils.createTempDir().toString def createStreamingContext(): StreamingContext = { val ssc = new StreamingContext(conf, Duration(1000)) ssc.checkpoint(checkpointDir) ssc } val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext) val socketStream = ssc.socketTextStream("localhost", 9999) socketStream.checkpoint(Seconds(1)) socketStream.foreachRDD(rdd => rdd.collect().foreach(println)) ssc.start() ssc.awaitTermination() {code} Appreantly, it seems fixed in https://github.com/apache/spark/commit/4a5558ca9921ce89b3996e9ead13b07123fc7a2d without a JIRA. I am resolving this. > "SparkException: DStream has not been initialized" when restoring > StreamingContext from checkpoint and the dstream is created afterwards > ---------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-13316 > URL: https://issues.apache.org/jira/browse/SPARK-13316 > Project: Spark > Issue Type: Bug > Components: DStreams > Reporter: Jacek Laskowski > Priority: Minor > > I faced the issue today but [it was already reported on > SO|http://stackoverflow.com/q/35090180/1305344] a couple of days ago and the > reason is that a dstream is registered after a StreamingContext has been > recreated from checkpoint. > It _appears_ that...no dstreams must be registered after a StreamingContext > has been recreated from checkpoint. It is *not* obvious at first. > The code: > {code} > def createStreamingContext(): StreamingContext = { > val ssc = new StreamingContext(sparkConf, Duration(1000)) > ssc.checkpoint(checkpointDir) > ssc > } > val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext) > val socketStream = ssc.socketTextStream(...) > socketStream.checkpoint(Seconds(1)) > socketStream.foreachRDD(...) > {code} > It should be described in docs at the very least and/or checked in the code > when the streaming computation starts. > The exception is as follows: > {code} > org.apache.spark.SparkException: > org.apache.spark.streaming.dstream.ConstantInputDStream@724797ab has not been > initialized > at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:311) > at > org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:89) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:329) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:233) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:228) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:228) > at > org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:97) > at > org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83) > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:589) > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585) > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585) > at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:585) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:579) > ... 43 elided > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org