Re: Subtract on rdd2 is throwing below exception

2015-11-05 Thread Yana Kadiyska
subtract is not the issue. Spark is lazy so a lot of times you'd have many,
many lines of code which does not in fact run until you do some action (in
your case, subtract). As you can see from the stacktrace, the NPE is from
joda which is used in the partitioner (Im suspecting in Cassandra).But the
short story is that the exception is in one of the many RDDs you're chaining

What I would suggest is that you force evaluation on your RDDs for
debugging purposes -- e.g RDD2, mappedRDD, CassandraJoinRDD,
subtractedRDD...I'd
try to take a count on all of these and see if you can flush out the issue.
Another place to look is your cassandra table -- my guess would be that you
use time as part of a partition key somewhere and the data in the field
you're using is no good...

On Thu, Nov 5, 2015 at 8:32 AM, Priya Ch 
wrote:

> Hi All,
>
>
>  I am seeing exception when trying to substract 2 rdds.
>
>  Lets say rdd1 has messages like -
>
> *  pnr,  bookingId,  BookingObject*
>  101,   1,   BookingObject1 // - event number is 0
>  102,   1,   BookingObject2// - event number is 0
>  103,   2,   BookingObject3//-event number is  1
>
> rdd1 looks like RDD1[(String,Int,Booking)].
>
> Booking table in Cassandra has primary key as pnr and bookingId.
> Lets say Booking table has following rows-
>
> *pnr,  bookingId, eventNumber*
> Row1 -  101,   1,  1
> Row2 -  103,   2,  0
>
> RDD1.joinWithCassandraTable on columns pnr and bookingId with Booking
> table is giving me the following CassandraJoinRDD -
>
> (101, 1, BookingObject1), Row1
> (103, 2, BookingObject3), Row2
>
> Now on this rdd, I am comparing event number of BookinObject against
> eventNumber in the row and filter the messages whose eventNUmber is greater
> than that of in the row - which gives the following Rdd
>
> val RDD2:RDD[(String,Int,BookingObject), CassandraRow] contains the below
> record
>
> (102, 2, BookingObject3), Row2.
>
> But I also need pnr 102 from the original rdd as it is not existing in DB.
> Hence to get such messages - I am CassandraJoinRDD from original RDD i.e
> RDD1 as
>
> val mappedCRdd= CassandraJoinRDD.map{case(tuple, row) => tuple}
> subtractedRdd= RDD1.subtract(mappedCRdd)
>
>
>
> val mappedRdd2 = RDD2.map{case(tuple, row) => tuple}
>
> Now I am doing union this subtractedRdd with mappedRdd2 as
> subtractedRdd.union(mappedRdd2 )
>
> But subtract on Rdd is throwing below exception -
>
>
> java.lang.NullPointerException
>   at org.joda.time.LocalDateTime.getValue(LocalDateTime.java:566)
>   at org.joda.time.base.AbstractPartial.hashCode(AbstractPartial.java:282)
>   at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
>   at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
>   at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
>   at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
>   at com.amadeus.ti.models.tof.TOFModel$GCAS.hashCode(TOFModel.scala:14)
>   at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
>   at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
>   at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
>   at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
>   at com.amadeus.ti.models.tof.TOFModel$TAOFRS.hashCode(TOFModel.scala:7)
>   at java.util.HashMap.hash(HashMap.java:362)
>   at java.util.HashMap.put(HashMap.java:492)
>   at org.apache.spark.rdd.SubtractedRDD.org 
> $apache$spark$rdd$SubtractedRDD$$getSeq$1(SubtractedRDD.scala:104)
>   at 
> org.apache.spark.rdd.SubtractedRDD$$anonfun$compute$1.apply(SubtractedRDD.scala:119)
>   at 
> org.apache.spark.rdd.SubtractedRDD$$anonfun$compute$1.apply(SubtractedRDD.scala:119)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at 
> org.apache.spark.rdd.SubtractedRDD.integrate$1(SubtractedRDD.scala:116)
>   at org.apache.spark.rdd.SubtractedRDD.compute(SubtractedRDD.scala:119)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at 

Subtract on rdd2 is throwing below exception

2015-11-05 Thread Priya Ch
Hi All,


 I am seeing exception when trying to substract 2 rdds.

 Lets say rdd1 has messages like -

*  pnr,  bookingId,  BookingObject*
 101,   1,   BookingObject1 // - event number is 0
 102,   1,   BookingObject2// - event number is 0
 103,   2,   BookingObject3//-event number is  1

rdd1 looks like RDD1[(String,Int,Booking)].

Booking table in Cassandra has primary key as pnr and bookingId.
Lets say Booking table has following rows-

*pnr,  bookingId, eventNumber*
Row1 -  101,   1,  1
Row2 -  103,   2,  0

RDD1.joinWithCassandraTable on columns pnr and bookingId with Booking table
is giving me the following CassandraJoinRDD -

(101, 1, BookingObject1), Row1
(103, 2, BookingObject3), Row2

Now on this rdd, I am comparing event number of BookinObject against
eventNumber in the row and filter the messages whose eventNUmber is greater
than that of in the row - which gives the following Rdd

val RDD2:RDD[(String,Int,BookingObject), CassandraRow] contains the below
record

(102, 2, BookingObject3), Row2.

But I also need pnr 102 from the original rdd as it is not existing in DB.
Hence to get such messages - I am CassandraJoinRDD from original RDD i.e
RDD1 as

val mappedCRdd= CassandraJoinRDD.map{case(tuple, row) => tuple}
subtractedRdd= RDD1.subtract(mappedCRdd)



val mappedRdd2 = RDD2.map{case(tuple, row) => tuple}

Now I am doing union this subtractedRdd with mappedRdd2 as
subtractedRdd.union(mappedRdd2 )

But subtract on Rdd is throwing below exception -


java.lang.NullPointerException
at org.joda.time.LocalDateTime.getValue(LocalDateTime.java:566)
at org.joda.time.base.AbstractPartial.hashCode(AbstractPartial.java:282)
at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
at com.amadeus.ti.models.tof.TOFModel$GCAS.hashCode(TOFModel.scala:14)
at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63)
at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210)
at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172)
at com.amadeus.ti.models.tof.TOFModel$TAOFRS.hashCode(TOFModel.scala:7)
at java.util.HashMap.hash(HashMap.java:362)
at java.util.HashMap.put(HashMap.java:492)
at org.apache.spark.rdd.SubtractedRDD.org
$apache$spark$rdd$SubtractedRDD$$getSeq$1(SubtractedRDD.scala:104)
at 
org.apache.spark.rdd.SubtractedRDD$$anonfun$compute$1.apply(SubtractedRDD.scala:119)
at 
org.apache.spark.rdd.SubtractedRDD$$anonfun$compute$1.apply(SubtractedRDD.scala:119)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.rdd.SubtractedRDD.integrate$1(SubtractedRDD.scala:116)
at org.apache.spark.rdd.SubtractedRDD.compute(SubtractedRDD.scala:119)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

If subtract if the problem, what  is other way i could achieve this or is
it something  I am doing wrong?

Thanks,
Padma Ch