Sorry I just saw Graham's email after sending my previous email about this
bug...

I have been seeing this same issue on our ALS runs last week but I thought
it was due my hacky way to run mllib 1.1 snapshot on core 1.0...

What's the status of this PR ? Will this fix be back-ported to 1.0.1 as we
are running 1.0.1 stable standalone cluster ?

Till the PR merges does it make sense to not use Kryo ? What are the other
recommended efficient serializers ?

Thanks.
Deb


On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <graham.den...@gmail.com>
wrote:

> I now have a complete pull request for this issue that I'd like to get
> reviewed and committed.  The PR is available here:
> https://github.com/apache/spark/pull/1890 and includes a testcase for the
> issue I described.  I've also submitted a related PR (
> https://github.com/apache/spark/pull/1827) that causes exceptions raised
> while attempting to run the custom kryo registrator not to be swallowed.
>
> Thanks,
> Graham
>
>
> On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com> wrote:
>
> > I've submitted a work-in-progress pull request for this issue that I'd
> > like feedback on.  See https://github.com/apache/spark/pull/1890 . I've
> > also submitted a pull request for the related issue that the exceptions
> hit
> > when trying to use a custom kryo registrator are being swallowed:
> > https://github.com/apache/spark/pull/1827
> >
> > The approach in my pull request is to get the Worker processes to
> download
> > the application jars and add them to the Executor class path at launch
> > time. There are a couple of things that still need to be done before this
> > can be merged:
> > 1. At the moment, the first time a task runs in the executor, the
> > application jars are downloaded again.  My solution here would be to make
> > the executor not download any jars that already exist.  Previously, the
> > driver & executor kept track of the timestamp of jar files and would
> > redownload 'updated' jars, however this never made sense as the previous
> > version of the updated jar may have already been loaded into the
> executor,
> > so the updated jar may have no effect.  As my current pull request
> removes
> > the timestamp for jars, just checking whether the jar exists will allow
> us
> > to avoid downloading the jars again.
> > 2. Tests. :-)
> >
> > A side-benefit of my pull request is that you will be able to use custom
> > serialisers that are distributed in a user jar.  Currently, the
> serialiser
> > instance is created in the Executor process before the first task is
> > received and therefore before any user jars are downloaded.  As this PR
> > adds user jars to the Executor process at launch time, this won't be an
> > issue.
> >
> >
> > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com> wrote:
> >
> >> See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for
> >> the full stacktrace, but it's in the BlockManager/BlockManagerWorker
> where
> >> it's trying to fulfil a "getBlock" request for another node.  The
> objects
> >> that would be in the block haven't yet been serialised, and that then
> >> causes the deserialisation to happen on that thread.  See
> >> MemoryStore.scala:102.
> >>
> >>
> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote:
> >>
> >>> I don't think it was a conscious design decision to not include the
> >>> application classes in the connection manager serializer. We should fix
> >>> that. Where is it deserializing data in that thread?
> >>>
> >>>  4 might make sense in the long run, but it adds a lot of complexity to
> >>> the code base (whole separate code base, task queue,
> blocking/non-blocking
> >>> logic within task threads) that can be error prone, so I think it is
> best
> >>> to stay away from that right now.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <graham.den...@gmail.com
> >
> >>> wrote:
> >>>
> >>>> Hi Spark devs,
> >>>>
> >>>> I’ve posted an issue on JIRA (
> >>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs when
> >>>> using
> >>>> Kryo serialisation with a custom Kryo registrator to register custom
> >>>> classes with Kryo.  This is an insidious issue that
> >>>> non-deterministically
> >>>> causes Kryo to have different ID number => class name maps on
> different
> >>>> nodes, which then causes weird exceptions (ClassCastException,
> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at
> >>>> deserialisation
> >>>> time.  I’ve created a reliable reproduction for the issue here:
> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation
> >>>>
> >>>> I’m happy to try and put a pull request together to try and address
> >>>> this,
> >>>> but it’s not obvious to me the right way to solve this and I’d like to
> >>>> get
> >>>> feedback / ideas on how to address this.
> >>>>
> >>>> The root cause of the problem is a "Failed to run
> >>>> spark.kryo.registrator”
> >>>> error which non-deterministically occurs in some executor processes
> >>>> during
> >>>> operation.  My custom Kryo registrator is in the application jar, and
> >>>> it is
> >>>> accessible on the worker nodes.  This is demonstrated by the fact that
> >>>> most
> >>>> of the time the custom kryo registrator is successfully run.
> >>>>
> >>>> What’s happening is that Kryo serialisation/deserialisation is
> happening
> >>>> most of the time on an “Executor task launch worker” thread, which has
> >>>> the
> >>>> thread's class loader set to contain the application jar.  This
> happens
> >>>> in
> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I
> can
> >>>> tell, it is only these threads that have access to the application jar
> >>>> (that contains the custom Kryo registrator).  However, the
> >>>> ConnectionManager threads sometimes need to serialise/deserialise
> >>>> objects
> >>>> to satisfy “getBlock” requests when the objects haven’t previously
> been
> >>>> serialised.  As the ConnectionManager threads don’t have the
> application
> >>>> jar available from their class loader, when it tries to look up the
> >>>> custom
> >>>> Kryo registrator, this fails.  Spark then swallows this exception,
> which
> >>>> results in a different ID number —> class mapping for this kryo
> >>>> instance,
> >>>> and this then causes deserialisation errors later on a different node.
> >>>>
> >>>> A related issue to the issue reported in SPARK-2878 is that Spark
> >>>> probably
> >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo
> >>>> registrators.
> >>>>  The user has explicitly specified this class, and if it
> >>>> deterministically
> >>>> can’t be found, then it may cause problems at serialisation /
> >>>> deserialisation time.  If only sometimes it can’t be found (as in this
> >>>> case), then it leads to a data corruption issue later on.  Either way,
> >>>> we’re better off dying due to the ClassNotFound exception earlier,
> than
> >>>> the
> >>>> weirder errors later on.
> >>>>
> >>>> I have some ideas on potential solutions to this issue, but I’m keen
> for
> >>>> experienced eyes to critique these approaches:
> >>>>
> >>>> 1. The simplest approach to fixing this would be to just make the
> >>>> application jar available to the connection manager threads, but I’m
> >>>> guessing it’s a design decision to isolate the application jar to just
> >>>> the
> >>>> executor task runner threads.  Also, I don’t know if there are any
> other
> >>>> threads that might be interacting with kryo serialisation /
> >>>> deserialisation.
> >>>> 2. Before looking up the custom Kryo registrator, change the thread’s
> >>>> class
> >>>> loader to include the application jar, then restore the class loader
> >>>> after
> >>>> the kryo registrator has been run.  I don’t know if this would have
> any
> >>>> other side-effects.
> >>>> 3. Always serialise / deserialise on the existing TaskRunner threads,
> >>>> rather than delaying serialisation until later, when it can be done
> >>>> only if
> >>>> needed.  This approach would probably have negative performance
> >>>> consequences.
> >>>> 4. Create a new dedicated thread pool for lazy serialisation /
> >>>> deserialisation that has the application jar on the class path.
> >>>>  Serialisation / deserialisation would be the only thing these threads
> >>>> do,
> >>>> and this would minimise conflicts / interactions between the
> application
> >>>> jar and other jars.
> >>>>
> >>>> #4 sounds like the best approach to me, but I think would require
> >>>> considerable knowledge of Spark internals, which is beyond me at
> >>>> present.
> >>>>  Does anyone have any better (and ideally simpler) ideas?
> >>>>
> >>>> Cheers,
> >>>>
> >>>> Graham
> >>>>
> >>>
> >>>
> >>
> >
>

Reply via email to