>From the code, I think this field "rememberDuration" shouldn't be null, it
will be verified at the start, unless some place changes it's value in the
runtime that makes it null, but I cannot image how this happened. Maybe you
could add some logs around the place where exception happens if you could
reproduce it.

On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.c...@gmail.com> wrote:

> No. this is the only exception that im getting multiple times in my log.
> Also i was reading some other topics earlier but im not faced this NPE.
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> I just did a local test with your code, seems everything is fine, the
>> only difference is that I use the master branch, but I don't think it
>> changes a lot in this part. Do you met any other exceptions or errors
>> beside this one? Probably this is due to other exceptions that makes this
>> system unstable.
>>
>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.c...@gmail.com>
>> wrote:
>>
>>> No, i dont have any special settings. if i keep only reading line in my
>>> code, it's throwing NPE.
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>> wrote:
>>>
>>>> Do you have any special settings, from your code, I don't think it will
>>>> incur NPE at that place.
>>>>
>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>> wrote:
>>>>
>>>>> spark version - spark 1.4.1
>>>>>
>>>>> my code snippet:
>>>>>
>>>>> String brokers = "ip:port,ip:port";
>>>>> String topics = "x,y,z";
>>>>> HashSet<String> TopicsSet = new
>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>
>>>>> JavaPairInputDStream<String, String> messages =
>>>>> KafkaUtils.createDirectStream(
>>>>>            jssc,
>>>>>            String.class,
>>>>>            String.class,
>>>>>            StringDecoder.class,
>>>>>            StringDecoder.class,
>>>>>            kafkaParams,
>>>>>             TopicsSet
>>>>>        );
>>>>>
>>>>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void> ()
>>>>> {
>>>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>
>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>                 return null;
>>>>>             }
>>>>>        });
>>>>>
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> What Spark version are you using, also a small code snippet of how
>>>>>> you use Spark Streaming would be greatly helpful.
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>>>>> exception. Any idea for this ?
>>>>>>>
>>>>>>> *Thanks*,
>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ramkumar.c...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm trying to read from kafka stream and printing it textfile. I'm
>>>>>>>> using java over spark. I dont know why i'm getting the following 
>>>>>>>> exception.
>>>>>>>> Also exception message is very abstract.  can anyone please help me ?
>>>>>>>>
>>>>>>>> Log Trace :
>>>>>>>>
>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>> generator
>>>>>>>> java.lang.NullPointerException
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>         at
>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>         at
>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>         at
>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>         at
>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>         at
>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>         at
>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>         at
>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>>>>> exception: java.lang.NullPointerException
>>>>>>>> java.lang.NullPointerException
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>         at
>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>         at
>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>         at
>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>         at
>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>         at
>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>         at
>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>         at
>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Thanks*,
>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to