That could be a corner case bug. How do you add the 3rd party library to the class path of the driver? Through spark-submit? Could you give the command you used?
TD On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc <emre.sev...@gmail.com> wrote: > I've also tried the following: > > Configuration hadoopConfiguration = new Configuration(); > hadoopConfiguration.set("multilinejsoninputformat.member", "itemSet"); > > JavaStreamingContext ssc = > JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration, > factory, false); > > > but I still get the same exception. > > Why doesn't getOrCreate ignore that Hadoop configuration part (which > normally works, e.g. when not recovering)? > > -- > Emre > > > On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc <emre.sev...@gmail.com> wrote: > >> Hello, >> >> I have a Spark Streaming application (that uses Spark 1.2.1) that listens >> to an input directory, and when new JSON files are copied to that directory >> processes them, and writes them to an output directory. >> >> It uses a 3rd party library to process the multi-line JSON files ( >> https://github.com/alexholmes/json-mapreduce). You can see the relevant >> part of the streaming application at: >> >> https://gist.github.com/emres/ec18ee264e4eb0dd8f1a >> >> When I run this application locally, it works perfectly fine. But then I >> wanted to test whether it could recover from failure, e.g. if I stopped it >> right in the middle of processing some files. I started the streaming >> application, copied 100 files to the input directory, and hit Ctrl+C when >> it has alread processed about 50 files: >> >> ... >> 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> [Stage >> 0:==========================================================================================================================> >> (65 + 4) / 100] >> ^C >> >> Then I started the application again, expecting that it could recover >> from the checkpoint. For a while it started to read files again and then >> gave an exception: >> >> ... >> 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to >> process : 1 >> 2015-03-03 15:06:39 WARN SchemaValidatorDriver:145 - >> * * * hadoopConfiguration: itemSet >> 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage >> 0.0 (TID 0) >> java.io.IOException: Missing configuration value for >> multilinejsoninputformat.member >> at >> com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30) >> at >> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133) >> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) >> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> at org.apache.spark.scheduler.Task.run(Task.scala:56) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> Since in the exception it refers to a missing configuration >> "multilinejsoninputformat.member", I think it is about the following line: >> >> ssc.ssc().sc().hadoopConfiguration().set(" >> multilinejsoninputformat.member", "itemSet"); >> >> And this is why I also log the value of it, and as you can see above, >> just before it gives the exception in the recovery process, it shows that >> "multilinejsoninputformat.member" >> is set to "itemSet". But somehow it is not found during the recovery. >> This exception happens only when it tries to recover from a previously >> interrupted run. >> >> I've also tried moving the above line into the "createContext" method, >> but still had the same exception. >> >> Why is that? >> >> And how can I work around it? >> >> -- >> Emre Sevinç >> http://www.bigindustries.be/ >> >> > > > -- > Emre Sevinc >