See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for the
full stacktrace, but it's in the BlockManager/BlockManagerWorker where it's
trying to fulfil a "getBlock" request for another node.  The objects that
would be in the block haven't yet been serialised, and that then causes the
deserialisation to happen on that thread.  See MemoryStore.scala:102.


On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote:

> I don't think it was a conscious design decision to not include the
> application classes in the connection manager serializer. We should fix
> that. Where is it deserializing data in that thread?
>
> 4 might make sense in the long run, but it adds a lot of complexity to the
> code base (whole separate code base, task queue, blocking/non-blocking
> logic within task threads) that can be error prone, so I think it is best
> to stay away from that right now.
>
>
>
>
>
> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <graham.den...@gmail.com>
> wrote:
>
>> Hi Spark devs,
>>
>> I’ve posted an issue on JIRA (
>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs when using
>> Kryo serialisation with a custom Kryo registrator to register custom
>> classes with Kryo.  This is an insidious issue that non-deterministically
>> causes Kryo to have different ID number => class name maps on different
>> nodes, which then causes weird exceptions (ClassCastException,
>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at deserialisation
>> time.  I’ve created a reliable reproduction for the issue here:
>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>
>> I’m happy to try and put a pull request together to try and address this,
>> but it’s not obvious to me the right way to solve this and I’d like to get
>> feedback / ideas on how to address this.
>>
>> The root cause of the problem is a "Failed to run spark.kryo.registrator”
>> error which non-deterministically occurs in some executor processes during
>> operation.  My custom Kryo registrator is in the application jar, and it
>> is
>> accessible on the worker nodes.  This is demonstrated by the fact that
>> most
>> of the time the custom kryo registrator is successfully run.
>>
>> What’s happening is that Kryo serialisation/deserialisation is happening
>> most of the time on an “Executor task launch worker” thread, which has the
>> thread's class loader set to contain the application jar.  This happens in
>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can
>> tell, it is only these threads that have access to the application jar
>> (that contains the custom Kryo registrator).  However, the
>> ConnectionManager threads sometimes need to serialise/deserialise objects
>> to satisfy “getBlock” requests when the objects haven’t previously been
>> serialised.  As the ConnectionManager threads don’t have the application
>> jar available from their class loader, when it tries to look up the custom
>> Kryo registrator, this fails.  Spark then swallows this exception, which
>> results in a different ID number —> class mapping for this kryo instance,
>> and this then causes deserialisation errors later on a different node.
>>
>> A related issue to the issue reported in SPARK-2878 is that Spark probably
>> shouldn’t swallow the ClassNotFound exception for custom Kryo
>> registrators.
>>  The user has explicitly specified this class, and if it deterministically
>> can’t be found, then it may cause problems at serialisation /
>> deserialisation time.  If only sometimes it can’t be found (as in this
>> case), then it leads to a data corruption issue later on.  Either way,
>> we’re better off dying due to the ClassNotFound exception earlier, than
>> the
>> weirder errors later on.
>>
>> I have some ideas on potential solutions to this issue, but I’m keen for
>> experienced eyes to critique these approaches:
>>
>> 1. The simplest approach to fixing this would be to just make the
>> application jar available to the connection manager threads, but I’m
>> guessing it’s a design decision to isolate the application jar to just the
>> executor task runner threads.  Also, I don’t know if there are any other
>> threads that might be interacting with kryo serialisation /
>> deserialisation.
>> 2. Before looking up the custom Kryo registrator, change the thread’s
>> class
>> loader to include the application jar, then restore the class loader after
>> the kryo registrator has been run.  I don’t know if this would have any
>> other side-effects.
>> 3. Always serialise / deserialise on the existing TaskRunner threads,
>> rather than delaying serialisation until later, when it can be done only
>> if
>> needed.  This approach would probably have negative performance
>> consequences.
>> 4. Create a new dedicated thread pool for lazy serialisation /
>> deserialisation that has the application jar on the class path.
>>  Serialisation / deserialisation would be the only thing these threads do,
>> and this would minimise conflicts / interactions between the application
>> jar and other jars.
>>
>> #4 sounds like the best approach to me, but I think would require
>> considerable knowledge of Spark internals, which is beyond me at present.
>>  Does anyone have any better (and ideally simpler) ideas?
>>
>> Cheers,
>>
>> Graham
>>
>
>

Reply via email to