>From the code, I think this field "rememberDuration" shouldn't be null, it will be verified at the start, unless some place changes it's value in the runtime that makes it null, but I cannot image how this happened. Maybe you could add some logs around the place where exception happens if you could reproduce it.
On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.c...@gmail.com> wrote: > No. this is the only exception that im getting multiple times in my log. > Also i was reading some other topics earlier but im not faced this NPE. > > *Thanks*, > <https://in.linkedin.com/in/ramkumarcs31> > > > On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.s...@gmail.com> > wrote: > >> I just did a local test with your code, seems everything is fine, the >> only difference is that I use the master branch, but I don't think it >> changes a lot in this part. Do you met any other exceptions or errors >> beside this one? Probably this is due to other exceptions that makes this >> system unstable. >> >> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.c...@gmail.com> >> wrote: >> >>> No, i dont have any special settings. if i keep only reading line in my >>> code, it's throwing NPE. >>> >>> *Thanks*, >>> <https://in.linkedin.com/in/ramkumarcs31> >>> >>> >>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com> >>> wrote: >>> >>>> Do you have any special settings, from your code, I don't think it will >>>> incur NPE at that place. >>>> >>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.c...@gmail.com> >>>> wrote: >>>> >>>>> spark version - spark 1.4.1 >>>>> >>>>> my code snippet: >>>>> >>>>> String brokers = "ip:port,ip:port"; >>>>> String topics = "x,y,z"; >>>>> HashSet<String> TopicsSet = new >>>>> HashSet<String>(Arrays.asList(topics.split(","))); >>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>(); >>>>> kafkaParams.put("metadata.broker.list", brokers); >>>>> >>>>> JavaPairInputDStream<String, String> messages = >>>>> KafkaUtils.createDirectStream( >>>>> jssc, >>>>> String.class, >>>>> String.class, >>>>> StringDecoder.class, >>>>> StringDecoder.class, >>>>> kafkaParams, >>>>> TopicsSet >>>>> ); >>>>> >>>>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void> () >>>>> { >>>>> public Void call(JavaPairRDD<String , String> tuple) { >>>>> JavaRDD<String>rdd = tuple.values(); >>>>> >>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output"); >>>>> return null; >>>>> } >>>>> }); >>>>> >>>>> >>>>> *Thanks*, >>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>> >>>>> >>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sai.sai.s...@gmail.com> >>>>> wrote: >>>>> >>>>>> What Spark version are you using, also a small code snippet of how >>>>>> you use Spark Streaming would be greatly helpful. >>>>>> >>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ramkumar.c...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I can able to read and print few lines. Afterthat i'm getting this >>>>>>> exception. Any idea for this ? >>>>>>> >>>>>>> *Thanks*, >>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ramkumar.c...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I'm trying to read from kafka stream and printing it textfile. I'm >>>>>>>> using java over spark. I dont know why i'm getting the following >>>>>>>> exception. >>>>>>>> Also exception message is very abstract. can anyone please help me ? >>>>>>>> >>>>>>>> Log Trace : >>>>>>>> >>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job >>>>>>>> generator >>>>>>>> java.lang.NullPointerException >>>>>>>> at >>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>> at >>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) >>>>>>>> at >>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >>>>>>>> at >>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) >>>>>>>> at >>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) >>>>>>>> at >>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) >>>>>>>> at >>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267) >>>>>>>> at org.apache.spark.streaming.scheduler.JobGenerator.org >>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) >>>>>>>> at >>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw >>>>>>>> exception: java.lang.NullPointerException >>>>>>>> java.lang.NullPointerException >>>>>>>> at >>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>> at >>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) >>>>>>>> at >>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >>>>>>>> at >>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) >>>>>>>> at >>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) >>>>>>>> at >>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) >>>>>>>> at >>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267) >>>>>>>> at org.apache.spark.streaming.scheduler.JobGenerator.org >>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) >>>>>>>> at >>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *Thanks*, >>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >