I've submitted a work-in-progress pull request for this issue that I'd like
feedback on.  See https://github.com/apache/spark/pull/1890 . I've also
submitted a pull request for the related issue that the exceptions hit when
trying to use a custom kryo registrator are being swallowed:
https://github.com/apache/spark/pull/1827

The approach in my pull request is to get the Worker processes to download
the application jars and add them to the Executor class path at launch
time. There are a couple of things that still need to be done before this
can be merged:
1. At the moment, the first time a task runs in the executor, the
application jars are downloaded again.  My solution here would be to make
the executor not download any jars that already exist.  Previously, the
driver & executor kept track of the timestamp of jar files and would
redownload 'updated' jars, however this never made sense as the previous
version of the updated jar may have already been loaded into the executor,
so the updated jar may have no effect.  As my current pull request removes
the timestamp for jars, just checking whether the jar exists will allow us
to avoid downloading the jars again.
2. Tests. :-)

A side-benefit of my pull request is that you will be able to use custom
serialisers that are distributed in a user jar.  Currently, the serialiser
instance is created in the Executor process before the first task is
received and therefore before any user jars are downloaded.  As this PR
adds user jars to the Executor process at launch time, this won't be an
issue.


On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com> wrote:

> See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for
> the full stacktrace, but it's in the BlockManager/BlockManagerWorker where
> it's trying to fulfil a "getBlock" request for another node.  The objects
> that would be in the block haven't yet been serialised, and that then
> causes the deserialisation to happen on that thread.  See
> MemoryStore.scala:102.
>
>
> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote:
>
>> I don't think it was a conscious design decision to not include the
>> application classes in the connection manager serializer. We should fix
>> that. Where is it deserializing data in that thread?
>>
>> 4 might make sense in the long run, but it adds a lot of complexity to
>> the code base (whole separate code base, task queue, blocking/non-blocking
>> logic within task threads) that can be error prone, so I think it is best
>> to stay away from that right now.
>>
>>
>>
>>
>>
>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <graham.den...@gmail.com>
>> wrote:
>>
>>> Hi Spark devs,
>>>
>>> I’ve posted an issue on JIRA (
>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs when
>>> using
>>> Kryo serialisation with a custom Kryo registrator to register custom
>>> classes with Kryo.  This is an insidious issue that non-deterministically
>>> causes Kryo to have different ID number => class name maps on different
>>> nodes, which then causes weird exceptions (ClassCastException,
>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at
>>> deserialisation
>>> time.  I’ve created a reliable reproduction for the issue here:
>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>
>>> I’m happy to try and put a pull request together to try and address this,
>>> but it’s not obvious to me the right way to solve this and I’d like to
>>> get
>>> feedback / ideas on how to address this.
>>>
>>> The root cause of the problem is a "Failed to run spark.kryo.registrator”
>>> error which non-deterministically occurs in some executor processes
>>> during
>>> operation.  My custom Kryo registrator is in the application jar, and it
>>> is
>>> accessible on the worker nodes.  This is demonstrated by the fact that
>>> most
>>> of the time the custom kryo registrator is successfully run.
>>>
>>> What’s happening is that Kryo serialisation/deserialisation is happening
>>> most of the time on an “Executor task launch worker” thread, which has
>>> the
>>> thread's class loader set to contain the application jar.  This happens
>>> in
>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can
>>> tell, it is only these threads that have access to the application jar
>>> (that contains the custom Kryo registrator).  However, the
>>> ConnectionManager threads sometimes need to serialise/deserialise objects
>>> to satisfy “getBlock” requests when the objects haven’t previously been
>>> serialised.  As the ConnectionManager threads don’t have the application
>>> jar available from their class loader, when it tries to look up the
>>> custom
>>> Kryo registrator, this fails.  Spark then swallows this exception, which
>>> results in a different ID number —> class mapping for this kryo instance,
>>> and this then causes deserialisation errors later on a different node.
>>>
>>> A related issue to the issue reported in SPARK-2878 is that Spark
>>> probably
>>> shouldn’t swallow the ClassNotFound exception for custom Kryo
>>> registrators.
>>>  The user has explicitly specified this class, and if it
>>> deterministically
>>> can’t be found, then it may cause problems at serialisation /
>>> deserialisation time.  If only sometimes it can’t be found (as in this
>>> case), then it leads to a data corruption issue later on.  Either way,
>>> we’re better off dying due to the ClassNotFound exception earlier, than
>>> the
>>> weirder errors later on.
>>>
>>> I have some ideas on potential solutions to this issue, but I’m keen for
>>> experienced eyes to critique these approaches:
>>>
>>> 1. The simplest approach to fixing this would be to just make the
>>> application jar available to the connection manager threads, but I’m
>>> guessing it’s a design decision to isolate the application jar to just
>>> the
>>> executor task runner threads.  Also, I don’t know if there are any other
>>> threads that might be interacting with kryo serialisation /
>>> deserialisation.
>>> 2. Before looking up the custom Kryo registrator, change the thread’s
>>> class
>>> loader to include the application jar, then restore the class loader
>>> after
>>> the kryo registrator has been run.  I don’t know if this would have any
>>> other side-effects.
>>> 3. Always serialise / deserialise on the existing TaskRunner threads,
>>> rather than delaying serialisation until later, when it can be done only
>>> if
>>> needed.  This approach would probably have negative performance
>>> consequences.
>>> 4. Create a new dedicated thread pool for lazy serialisation /
>>> deserialisation that has the application jar on the class path.
>>>  Serialisation / deserialisation would be the only thing these threads
>>> do,
>>> and this would minimise conflicts / interactions between the application
>>> jar and other jars.
>>>
>>> #4 sounds like the best approach to me, but I think would require
>>> considerable knowledge of Spark internals, which is beyond me at present.
>>>  Does anyone have any better (and ideally simpler) ideas?
>>>
>>> Cheers,
>>>
>>> Graham
>>>
>>
>>
>

Reply via email to