That could be a corner case bug. How do you add the 3rd party library to
the class path of the driver? Through spark-submit? Could you give the
command you used?

TD

On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc <emre.sev...@gmail.com> wrote:

> I've also tried the following:
>
>     Configuration hadoopConfiguration = new Configuration();
>     hadoopConfiguration.set("multilinejsoninputformat.member", "itemSet");
>
>     JavaStreamingContext ssc =
> JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
> factory, false);
>
>
> but I still get the same exception.
>
> Why doesn't getOrCreate ignore that Hadoop configuration part (which
> normally works, e.g. when not recovering)?
>
> --
> Emre
>
>
> On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc <emre.sev...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a Spark Streaming application (that uses Spark 1.2.1) that listens
>> to an input directory, and when new JSON files are copied to that directory
>> processes them, and writes them to an output directory.
>>
>> It uses a 3rd party library to process the multi-line JSON files (
>> https://github.com/alexholmes/json-mapreduce). You can see the relevant
>> part of the streaming application at:
>>
>>   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a
>>
>> When I run this application locally, it works perfectly fine. But then I
>> wanted to test whether it could recover from failure, e.g. if I stopped it
>> right in the middle of processing some files. I started the streaming
>> application, copied 100 files to the input directory, and hit Ctrl+C when
>> it has alread processed about 50 files:
>>
>> ...
>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> [Stage
>> 0:==========================================================================================================================>
>> (65 + 4) / 100]
>> ^C
>>
>> Then I started the application again, expecting that it could recover
>> from the checkpoint. For a while it started to read files again and then
>> gave an exception:
>>
>> ...
>> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
>>  * * * hadoopConfiguration: itemSet
>> 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage
>> 0.0 (TID 0)
>> java.io.IOException: Missing configuration value for
>> multilinejsoninputformat.member
>>     at
>> com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
>>     at
>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
>>     at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
>>     at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>     at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>     at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> Since in the exception it refers to a missing configuration
>> "multilinejsoninputformat.member", I think it is about the following line:
>>
>>    ssc.ssc().sc().hadoopConfiguration().set("
>> multilinejsoninputformat.member", "itemSet");
>>
>> And this is why I also log the value of it, and as you can see above,
>> just before it gives the exception in the recovery process, it shows that 
>> "multilinejsoninputformat.member"
>> is set to "itemSet". But somehow it is not found during the recovery.
>> This exception happens only when it tries to recover from a previously
>> interrupted run.
>>
>> I've also tried moving the above line into the "createContext" method,
>> but still had the same exception.
>>
>> Why is that?
>>
>> And how can I work around it?
>>
>> --
>> Emre Sevinç
>> http://www.bigindustries.be/
>>
>>
>
>
> --
> Emre Sevinc
>

Reply via email to