I'm using Kryo with these options: -Dspark.shuffle.spill=false -Dspark.storage.memoryFraction=0.4 -Dspark.serializer=org.apache.spark.serializer.KryoSerializer -Dspark.kryo.registrator=com.andrewash.CustomKryoRegistrator
The data is being read from a .lzo file and written back to another .lzo file if that affects things. Does that cover the compression and serialization libraries question? I can give master a shot with my repro but it may be some time now that I have a workaround. I'm trying to turn something around quickly and have my own bugs to debug as well :) Thanks! Andrew On Tue, Feb 18, 2014 at 9:02 PM, Andrew Or <andrewo...@gmail.com> wrote: > Looks like you have a large number of distinct keys. As you suspect, this > maybe due to hash collisions, which only up to 4 billion. It could be > related to this PR: https://github.com/apache/incubator-spark/pull/612. > > The other thing is we had some issues with the behavior of arbitrary > serialization/compression engines, and this is solved in the PR that Mridul > referenced. What compression and serialization libraries are you using? > > > 2014-02-18 20:56 GMT-08:00 Mridul Muralidharan <mri...@gmail.com>: > > > I had not resolved it in time for 0.9 - but IIRC there was a recent PR > > which fixed bugs in spill [1] : are you able to reproduce this with > > spark master ? > > > > Regards, > > Mridul > > > > [1] https://github.com/apache/incubator-spark/pull/533 > > > > On Wed, Feb 19, 2014 at 9:58 AM, Andrew Ash <and...@andrewash.com> > wrote: > > > I confirmed also that the spill to disk _was_ occurring: > > > > > > 14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling > > in-memory > > > map of 634 MB to disk (1 time so far) > > > 14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling > > in-memory > > > map of 581 MB to disk (1 time so far) > > > > > > > > > On Tue, Feb 18, 2014 at 8:07 PM, Andrew Ash <and...@andrewash.com> > > wrote: > > > > > >> Hi dev list, > > >> > > >> I'm running into an issue where I'm seeing different results from > Spark > > >> when I run with spark.shuffle.spill=false vs leaving it at the default > > >> (true). > > >> > > >> It's on internal data so I can't share my exact repro, but here's > > roughly > > >> what I'm doing: > > >> > > >> val rdd = sc.textFile(...) > > >> .map(l => ... (col1, col2)) // parse CSV into Tuple2[String,String] > > >> .distinct > > >> .join( > > >> sc.textFile(...) > > >> .map(l => ... (col1, col2)) // parse CSV into > > Tuple2[String,String] > > >> .distinct > > >> ) > > >> .map{ case (k,(v1,v2)) => Seq(v1,k,v2).mkString("|") } > > >> > > >> Then I output: > > >> (rdd.count, rdd.distinct.count) > > >> > > >> When I run with spark.shuffle.spill=false I get this: > > >> (3192729,3192729) > > >> > > >> And with spark.shuffle.spill=true I get this: > > >> (3192931,3192726) > > >> > > >> Has anyone else seen any bugs in join-heavy operations while using > > >> spark.shuffle.spill=true? > > >> > > >> My current theory is that I have a hashcode collision between rows > > >> (unusual I know) and that the AppendOnlyMap does equality based on > > >> hashcode()+equals() and ExternalAppendOnlyMap does equality based just > > on > > >> hashcode(). > > >> > > >> Would appreciate some additional eyes on this problem for sure. > > >> > > >> Right now I'm looking through the source and tests for AppendOnlyMap > and > > >> ExternalAppendOnlyMap to see if anything jumps out at me. > > >> > > >> Thanks! > > >> Andrew > > >> > > >