For compressing shuffle spills in 0.9, we added a hack such that it always
uses LZF, so actually your compression library shouldn't matter. We did
notice that Kryo was pre-fetching, however, such that batching reads led to
some items being lost. To fix this, we introduced a hack specifically for
Kryo that works around this. Although we tested it and the hack sufficed
back then, it is entirely possible that there are corner cases that we
missed. In any case, PR #533 (after 0.9 release) should have taken care of
the problem.

If you still run into the same problem on master, then it could be a corner
case that our current way of handling hash collisions missed. When you have
the time, do let us know what you find!

Andrew

2014-02-18 21:08 GMT-08:00 Andrew Ash <and...@andrewash.com>:

> I'm using Kryo with these options:
>
> -Dspark.shuffle.spill=false -Dspark.storage.memoryFraction=0.4
> -Dspark.serializer=org.apache.spark.serializer.KryoSerializer
> -Dspark.kryo.registrator=com.andrewash.CustomKryoRegistrator
>
> The data is being read from a .lzo file and written back to another .lzo
> file if that affects things.  Does that cover the compression and
> serialization libraries question?
>
> I can give master a shot with my repro but it may be some time now that I
> have a workaround.  I'm trying to turn something around quickly and have my
> own bugs to debug as well :)
>
> Thanks!
> Andrew
>
>
> On Tue, Feb 18, 2014 at 9:02 PM, Andrew Or <andrewo...@gmail.com> wrote:
>
> > Looks like you have a large number of distinct keys. As you suspect, this
> > maybe due to hash collisions, which only up to 4 billion. It could be
> > related to this PR: https://github.com/apache/incubator-spark/pull/612.
> >
> > The other thing is we had some issues with the behavior of arbitrary
> > serialization/compression engines, and this is solved in the PR that
> Mridul
> > referenced. What compression and serialization libraries are you using?
> >
> >
> > 2014-02-18 20:56 GMT-08:00 Mridul Muralidharan <mri...@gmail.com>:
> >
> > > I had not resolved it in time for 0.9 - but IIRC there was a recent PR
> > > which fixed bugs in spill [1] : are you able to reproduce this with
> > > spark master ?
> > >
> > > Regards,
> > > Mridul
> > >
> > > [1] https://github.com/apache/incubator-spark/pull/533
> > >
> > > On Wed, Feb 19, 2014 at 9:58 AM, Andrew Ash <and...@andrewash.com>
> > wrote:
> > > > I confirmed also that the spill to disk _was_ occurring:
> > > >
> > > > 14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling
> > > in-memory
> > > > map of 634 MB to disk (1 time so far)
> > > > 14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling
> > > in-memory
> > > > map of 581 MB to disk (1 time so far)
> > > >
> > > >
> > > > On Tue, Feb 18, 2014 at 8:07 PM, Andrew Ash <and...@andrewash.com>
> > > wrote:
> > > >
> > > >> Hi dev list,
> > > >>
> > > >> I'm running into an issue where I'm seeing different results from
> > Spark
> > > >> when I run with spark.shuffle.spill=false vs leaving it at the
> default
> > > >> (true).
> > > >>
> > > >> It's on internal data so I can't share my exact repro, but here's
> > > roughly
> > > >> what I'm doing:
> > > >>
> > > >> val rdd = sc.textFile(...)
> > > >>   .map(l => ... (col1, col2))  // parse CSV into
> Tuple2[String,String]
> > > >>   .distinct
> > > >>   .join(
> > > >>     sc.textFile(...)
> > > >>        .map(l => ... (col1, col2))  // parse CSV into
> > > Tuple2[String,String]
> > > >>        .distinct
> > > >>   )
> > > >>   .map{ case (k,(v1,v2)) => Seq(v1,k,v2).mkString("|") }
> > > >>
> > > >> Then I output:
> > > >> (rdd.count, rdd.distinct.count)
> > > >>
> > > >> When I run with spark.shuffle.spill=false I get this:
> > > >> (3192729,3192729)
> > > >>
> > > >> And with spark.shuffle.spill=true I get this:
> > > >> (3192931,3192726)
> > > >>
> > > >> Has anyone else seen any bugs in join-heavy operations while using
> > > >> spark.shuffle.spill=true?
> > > >>
> > > >> My current theory is that I have a hashcode collision between rows
> > > >> (unusual I know) and that the AppendOnlyMap does equality based on
> > > >> hashcode()+equals() and ExternalAppendOnlyMap does equality based
> just
> > > on
> > > >> hashcode().
> > > >>
> > > >> Would appreciate some additional eyes on this problem for sure.
> > > >>
> > > >> Right now I'm looking through the source and tests for AppendOnlyMap
> > and
> > > >> ExternalAppendOnlyMap to see if anything jumps out at me.
> > > >>
> > > >> Thanks!
> > > >> Andrew
> > > >>
> > >
> >
>

Reply via email to