Yes, I understand it might not work for custom serializer, but that is a much less common path.
Basically I want a quick fix for 1.1 release (which is coming up soon). I would not be comfortable making big changes to class path late into the release cycle. We can do that for 1.2. On Thu, Aug 14, 2014 at 2:35 AM, Graham Dennis <graham.den...@gmail.com> wrote: > That should work, but would you also make these changes to the > JavaSerializer? The API of these is the same so that you can select one or > the other (or in theory a custom serializer)? This also wouldn't address > the problem of shipping custom *serializers* (not kryo registrators) in > user jars. > > On 14 August 2014 19:23, Reynold Xin <r...@databricks.com> wrote: > >> Graham, >> >> SparkEnv only creates a KryoSerializer, but as I understand that >> serializer doesn't actually initializes the registrator since that is only >> called when newKryo() is called when KryoSerializerInstance is initialized. >> >> Basically I'm thinking a quick fix for 1.2: >> >> 1. Add a classLoader field to KryoSerializer; initialize new >> KryoSerializerInstance with that class loader >> >> 2. Set that classLoader to the executor's class loader when Executor is >> initialized. >> >> Then all deser calls should be using the executor's class loader. >> >> >> >> >> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <graham.den...@gmail.com> >> wrote: >> >>> Hi Reynold, >>> >>> That would solve this specific issue, but you'd need to be careful that >>> you never created a serialiser instance before the first task is received. >>> Currently in Executor.TaskRunner.run a closure serialiser instance is >>> created before any application jars are downloaded, but that could be >>> moved. To me, this seems a little fragile. >>> >>> However there is a related issue where you can't ship a custom >>> serialiser in an application jar because the serialiser is instantiated >>> when the SparkEnv object is created, which is before any tasks are received >>> by the executor. The above approach wouldn't help with this problem. >>> Additionally, the YARN scheduler currently uses this approach of adding >>> the application jar to the Executor classpath, so it would make things a >>> bit more uniform. >>> >>> Cheers, >>> Graham >>> >>> >>> On 14 August 2014 17:37, Reynold Xin <r...@databricks.com> wrote: >>> >>>> Graham, >>>> >>>> Thanks for working on this. This is an important bug to fix. >>>> >>>> I don't have the whole context and obviously I haven't spent nearly >>>> as much time on this as you have, but I'm wondering what if we always pass >>>> the executor's ClassLoader to the Kryo serializer? Will that solve this >>>> problem? >>>> >>>> >>>> >>>> >>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis < >>>> graham.den...@gmail.com> wrote: >>>> >>>>> Hi Deb, >>>>> >>>>> The only alternative serialiser is the JavaSerialiser (the default). >>>>> Theoretically Spark supports custom serialisers, but due to a related >>>>> issue, custom serialisers currently can't live in application jars and >>>>> must >>>>> be available to all executors at launch. My PR fixes this issue as well, >>>>> allowing custom serialisers to be shipped in application jars. >>>>> >>>>> Graham >>>>> >>>>> >>>>> On 14 August 2014 16:56, Debasish Das <debasish.da...@gmail.com> >>>>> wrote: >>>>> >>>>>> Sorry I just saw Graham's email after sending my previous email about >>>>>> this bug... >>>>>> >>>>>> I have been seeing this same issue on our ALS runs last week but I >>>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0... >>>>>> >>>>>> What's the status of this PR ? Will this fix be back-ported to 1.0.1 >>>>>> as we are running 1.0.1 stable standalone cluster ? >>>>>> >>>>>> Till the PR merges does it make sense to not use Kryo ? What are the >>>>>> other recommended efficient serializers ? >>>>>> >>>>>> Thanks. >>>>>> Deb >>>>>> >>>>>> >>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis < >>>>>> graham.den...@gmail.com> wrote: >>>>>> >>>>>>> I now have a complete pull request for this issue that I'd like to >>>>>>> get >>>>>>> reviewed and committed. The PR is available here: >>>>>>> https://github.com/apache/spark/pull/1890 and includes a testcase >>>>>>> for the >>>>>>> issue I described. I've also submitted a related PR ( >>>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions >>>>>>> raised >>>>>>> while attempting to run the custom kryo registrator not to be >>>>>>> swallowed. >>>>>>> >>>>>>> Thanks, >>>>>>> Graham >>>>>>> >>>>>>> >>>>>>> On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> > I've submitted a work-in-progress pull request for this issue that >>>>>>> I'd >>>>>>> > like feedback on. See https://github.com/apache/spark/pull/1890 >>>>>>> . I've >>>>>>> > also submitted a pull request for the related issue that the >>>>>>> exceptions hit >>>>>>> > when trying to use a custom kryo registrator are being swallowed: >>>>>>> > https://github.com/apache/spark/pull/1827 >>>>>>> > >>>>>>> > The approach in my pull request is to get the Worker processes to >>>>>>> download >>>>>>> > the application jars and add them to the Executor class path at >>>>>>> launch >>>>>>> > time. There are a couple of things that still need to be done >>>>>>> before this >>>>>>> > can be merged: >>>>>>> > 1. At the moment, the first time a task runs in the executor, the >>>>>>> > application jars are downloaded again. My solution here would be >>>>>>> to make >>>>>>> > the executor not download any jars that already exist. >>>>>>> Previously, the >>>>>>> > driver & executor kept track of the timestamp of jar files and >>>>>>> would >>>>>>> > redownload 'updated' jars, however this never made sense as the >>>>>>> previous >>>>>>> > version of the updated jar may have already been loaded into the >>>>>>> executor, >>>>>>> > so the updated jar may have no effect. As my current pull request >>>>>>> removes >>>>>>> > the timestamp for jars, just checking whether the jar exists will >>>>>>> allow us >>>>>>> > to avoid downloading the jars again. >>>>>>> > 2. Tests. :-) >>>>>>> > >>>>>>> > A side-benefit of my pull request is that you will be able to use >>>>>>> custom >>>>>>> > serialisers that are distributed in a user jar. Currently, the >>>>>>> serialiser >>>>>>> > instance is created in the Executor process before the first task >>>>>>> is >>>>>>> > received and therefore before any user jars are downloaded. As >>>>>>> this PR >>>>>>> > adds user jars to the Executor process at launch time, this won't >>>>>>> be an >>>>>>> > issue. >>>>>>> > >>>>>>> > >>>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com> >>>>>>> wrote: >>>>>>> > >>>>>>> >> See my comment on >>>>>>> https://issues.apache.org/jira/browse/SPARK-2878 for >>>>>>> >> the full stacktrace, but it's in the >>>>>>> BlockManager/BlockManagerWorker where >>>>>>> >> it's trying to fulfil a "getBlock" request for another node. The >>>>>>> objects >>>>>>> >> that would be in the block haven't yet been serialised, and that >>>>>>> then >>>>>>> >> causes the deserialisation to happen on that thread. See >>>>>>> >> MemoryStore.scala:102. >>>>>>> >> >>>>>>> >> >>>>>>> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote: >>>>>>> >> >>>>>>> >>> I don't think it was a conscious design decision to not include >>>>>>> the >>>>>>> >>> application classes in the connection manager serializer. We >>>>>>> should fix >>>>>>> >>> that. Where is it deserializing data in that thread? >>>>>>> >>> >>>>>>> >>> 4 might make sense in the long run, but it adds a lot of >>>>>>> complexity to >>>>>>> >>> the code base (whole separate code base, task queue, >>>>>>> blocking/non-blocking >>>>>>> >>> logic within task threads) that can be error prone, so I think >>>>>>> it is best >>>>>>> >>> to stay away from that right now. >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis < >>>>>>> graham.den...@gmail.com> >>>>>>> >>> wrote: >>>>>>> >>> >>>>>>> >>>> Hi Spark devs, >>>>>>> >>>> >>>>>>> >>>> I’ve posted an issue on JIRA ( >>>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs >>>>>>> when >>>>>>> >>>> using >>>>>>> >>>> Kryo serialisation with a custom Kryo registrator to register >>>>>>> custom >>>>>>> >>>> classes with Kryo. This is an insidious issue that >>>>>>> >>>> non-deterministically >>>>>>> >>>> causes Kryo to have different ID number => class name maps on >>>>>>> different >>>>>>> >>>> nodes, which then causes weird exceptions (ClassCastException, >>>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at >>>>>>> >>>> deserialisation >>>>>>> >>>> time. I’ve created a reliable reproduction for the issue here: >>>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation >>>>>>> >>>> >>>>>>> >>>> I’m happy to try and put a pull request together to try and >>>>>>> address >>>>>>> >>>> this, >>>>>>> >>>> but it’s not obvious to me the right way to solve this and I’d >>>>>>> like to >>>>>>> >>>> get >>>>>>> >>>> feedback / ideas on how to address this. >>>>>>> >>>> >>>>>>> >>>> The root cause of the problem is a "Failed to run >>>>>>> >>>> spark.kryo.registrator” >>>>>>> >>>> error which non-deterministically occurs in some executor >>>>>>> processes >>>>>>> >>>> during >>>>>>> >>>> operation. My custom Kryo registrator is in the application >>>>>>> jar, and >>>>>>> >>>> it is >>>>>>> >>>> accessible on the worker nodes. This is demonstrated by the >>>>>>> fact that >>>>>>> >>>> most >>>>>>> >>>> of the time the custom kryo registrator is successfully run. >>>>>>> >>>> >>>>>>> >>>> What’s happening is that Kryo serialisation/deserialisation is >>>>>>> happening >>>>>>> >>>> most of the time on an “Executor task launch worker” thread, >>>>>>> which has >>>>>>> >>>> the >>>>>>> >>>> thread's class loader set to contain the application jar. This >>>>>>> happens >>>>>>> >>>> in >>>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from >>>>>>> what I can >>>>>>> >>>> tell, it is only these threads that have access to the >>>>>>> application jar >>>>>>> >>>> (that contains the custom Kryo registrator). However, the >>>>>>> >>>> ConnectionManager threads sometimes need to >>>>>>> serialise/deserialise >>>>>>> >>>> objects >>>>>>> >>>> to satisfy “getBlock” requests when the objects haven’t >>>>>>> previously been >>>>>>> >>>> serialised. As the ConnectionManager threads don’t have the >>>>>>> application >>>>>>> >>>> jar available from their class loader, when it tries to look up >>>>>>> the >>>>>>> >>>> custom >>>>>>> >>>> Kryo registrator, this fails. Spark then swallows this >>>>>>> exception, which >>>>>>> >>>> results in a different ID number —> class mapping for this kryo >>>>>>> >>>> instance, >>>>>>> >>>> and this then causes deserialisation errors later on a >>>>>>> different node. >>>>>>> >>>> >>>>>>> >>>> A related issue to the issue reported in SPARK-2878 is that >>>>>>> Spark >>>>>>> >>>> probably >>>>>>> >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo >>>>>>> >>>> registrators. >>>>>>> >>>> The user has explicitly specified this class, and if it >>>>>>> >>>> deterministically >>>>>>> >>>> can’t be found, then it may cause problems at serialisation / >>>>>>> >>>> deserialisation time. If only sometimes it can’t be found (as >>>>>>> in this >>>>>>> >>>> case), then it leads to a data corruption issue later on. >>>>>>> Either way, >>>>>>> >>>> we’re better off dying due to the ClassNotFound exception >>>>>>> earlier, than >>>>>>> >>>> the >>>>>>> >>>> weirder errors later on. >>>>>>> >>>> >>>>>>> >>>> I have some ideas on potential solutions to this issue, but I’m >>>>>>> keen for >>>>>>> >>>> experienced eyes to critique these approaches: >>>>>>> >>>> >>>>>>> >>>> 1. The simplest approach to fixing this would be to just make >>>>>>> the >>>>>>> >>>> application jar available to the connection manager threads, >>>>>>> but I’m >>>>>>> >>>> guessing it’s a design decision to isolate the application jar >>>>>>> to just >>>>>>> >>>> the >>>>>>> >>>> executor task runner threads. Also, I don’t know if there are >>>>>>> any other >>>>>>> >>>> threads that might be interacting with kryo serialisation / >>>>>>> >>>> deserialisation. >>>>>>> >>>> 2. Before looking up the custom Kryo registrator, change the >>>>>>> thread’s >>>>>>> >>>> class >>>>>>> >>>> loader to include the application jar, then restore the class >>>>>>> loader >>>>>>> >>>> after >>>>>>> >>>> the kryo registrator has been run. I don’t know if this would >>>>>>> have any >>>>>>> >>>> other side-effects. >>>>>>> >>>> 3. Always serialise / deserialise on the existing TaskRunner >>>>>>> threads, >>>>>>> >>>> rather than delaying serialisation until later, when it can be >>>>>>> done >>>>>>> >>>> only if >>>>>>> >>>> needed. This approach would probably have negative performance >>>>>>> >>>> consequences. >>>>>>> >>>> 4. Create a new dedicated thread pool for lazy serialisation / >>>>>>> >>>> deserialisation that has the application jar on the class path. >>>>>>> >>>> Serialisation / deserialisation would be the only thing these >>>>>>> threads >>>>>>> >>>> do, >>>>>>> >>>> and this would minimise conflicts / interactions between the >>>>>>> application >>>>>>> >>>> jar and other jars. >>>>>>> >>>> >>>>>>> >>>> #4 sounds like the best approach to me, but I think would >>>>>>> require >>>>>>> >>>> considerable knowledge of Spark internals, which is beyond me at >>>>>>> >>>> present. >>>>>>> >>>> Does anyone have any better (and ideally simpler) ideas? >>>>>>> >>>> >>>>>>> >>>> Cheers, >>>>>>> >>>> >>>>>>> >>>> Graham >>>>>>> >>>> >>>>>>> >>> >>>>>>> >>> >>>>>>> >> >>>>>>> > >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >