Sorry I just saw Graham's email after sending my previous email about this bug...
I have been seeing this same issue on our ALS runs last week but I thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0... What's the status of this PR ? Will this fix be back-ported to 1.0.1 as we are running 1.0.1 stable standalone cluster ? Till the PR merges does it make sense to not use Kryo ? What are the other recommended efficient serializers ? Thanks. Deb On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <graham.den...@gmail.com> wrote: > I now have a complete pull request for this issue that I'd like to get > reviewed and committed. The PR is available here: > https://github.com/apache/spark/pull/1890 and includes a testcase for the > issue I described. I've also submitted a related PR ( > https://github.com/apache/spark/pull/1827) that causes exceptions raised > while attempting to run the custom kryo registrator not to be swallowed. > > Thanks, > Graham > > > On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com> wrote: > > > I've submitted a work-in-progress pull request for this issue that I'd > > like feedback on. See https://github.com/apache/spark/pull/1890 . I've > > also submitted a pull request for the related issue that the exceptions > hit > > when trying to use a custom kryo registrator are being swallowed: > > https://github.com/apache/spark/pull/1827 > > > > The approach in my pull request is to get the Worker processes to > download > > the application jars and add them to the Executor class path at launch > > time. There are a couple of things that still need to be done before this > > can be merged: > > 1. At the moment, the first time a task runs in the executor, the > > application jars are downloaded again. My solution here would be to make > > the executor not download any jars that already exist. Previously, the > > driver & executor kept track of the timestamp of jar files and would > > redownload 'updated' jars, however this never made sense as the previous > > version of the updated jar may have already been loaded into the > executor, > > so the updated jar may have no effect. As my current pull request > removes > > the timestamp for jars, just checking whether the jar exists will allow > us > > to avoid downloading the jars again. > > 2. Tests. :-) > > > > A side-benefit of my pull request is that you will be able to use custom > > serialisers that are distributed in a user jar. Currently, the > serialiser > > instance is created in the Executor process before the first task is > > received and therefore before any user jars are downloaded. As this PR > > adds user jars to the Executor process at launch time, this won't be an > > issue. > > > > > > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com> wrote: > > > >> See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for > >> the full stacktrace, but it's in the BlockManager/BlockManagerWorker > where > >> it's trying to fulfil a "getBlock" request for another node. The > objects > >> that would be in the block haven't yet been serialised, and that then > >> causes the deserialisation to happen on that thread. See > >> MemoryStore.scala:102. > >> > >> > >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote: > >> > >>> I don't think it was a conscious design decision to not include the > >>> application classes in the connection manager serializer. We should fix > >>> that. Where is it deserializing data in that thread? > >>> > >>> 4 might make sense in the long run, but it adds a lot of complexity to > >>> the code base (whole separate code base, task queue, > blocking/non-blocking > >>> logic within task threads) that can be error prone, so I think it is > best > >>> to stay away from that right now. > >>> > >>> > >>> > >>> > >>> > >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <graham.den...@gmail.com > > > >>> wrote: > >>> > >>>> Hi Spark devs, > >>>> > >>>> I’ve posted an issue on JIRA ( > >>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs when > >>>> using > >>>> Kryo serialisation with a custom Kryo registrator to register custom > >>>> classes with Kryo. This is an insidious issue that > >>>> non-deterministically > >>>> causes Kryo to have different ID number => class name maps on > different > >>>> nodes, which then causes weird exceptions (ClassCastException, > >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at > >>>> deserialisation > >>>> time. I’ve created a reliable reproduction for the issue here: > >>>> https://github.com/GrahamDennis/spark-kryo-serialisation > >>>> > >>>> I’m happy to try and put a pull request together to try and address > >>>> this, > >>>> but it’s not obvious to me the right way to solve this and I’d like to > >>>> get > >>>> feedback / ideas on how to address this. > >>>> > >>>> The root cause of the problem is a "Failed to run > >>>> spark.kryo.registrator” > >>>> error which non-deterministically occurs in some executor processes > >>>> during > >>>> operation. My custom Kryo registrator is in the application jar, and > >>>> it is > >>>> accessible on the worker nodes. This is demonstrated by the fact that > >>>> most > >>>> of the time the custom kryo registrator is successfully run. > >>>> > >>>> What’s happening is that Kryo serialisation/deserialisation is > happening > >>>> most of the time on an “Executor task launch worker” thread, which has > >>>> the > >>>> thread's class loader set to contain the application jar. This > happens > >>>> in > >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I > can > >>>> tell, it is only these threads that have access to the application jar > >>>> (that contains the custom Kryo registrator). However, the > >>>> ConnectionManager threads sometimes need to serialise/deserialise > >>>> objects > >>>> to satisfy “getBlock” requests when the objects haven’t previously > been > >>>> serialised. As the ConnectionManager threads don’t have the > application > >>>> jar available from their class loader, when it tries to look up the > >>>> custom > >>>> Kryo registrator, this fails. Spark then swallows this exception, > which > >>>> results in a different ID number —> class mapping for this kryo > >>>> instance, > >>>> and this then causes deserialisation errors later on a different node. > >>>> > >>>> A related issue to the issue reported in SPARK-2878 is that Spark > >>>> probably > >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo > >>>> registrators. > >>>> The user has explicitly specified this class, and if it > >>>> deterministically > >>>> can’t be found, then it may cause problems at serialisation / > >>>> deserialisation time. If only sometimes it can’t be found (as in this > >>>> case), then it leads to a data corruption issue later on. Either way, > >>>> we’re better off dying due to the ClassNotFound exception earlier, > than > >>>> the > >>>> weirder errors later on. > >>>> > >>>> I have some ideas on potential solutions to this issue, but I’m keen > for > >>>> experienced eyes to critique these approaches: > >>>> > >>>> 1. The simplest approach to fixing this would be to just make the > >>>> application jar available to the connection manager threads, but I’m > >>>> guessing it’s a design decision to isolate the application jar to just > >>>> the > >>>> executor task runner threads. Also, I don’t know if there are any > other > >>>> threads that might be interacting with kryo serialisation / > >>>> deserialisation. > >>>> 2. Before looking up the custom Kryo registrator, change the thread’s > >>>> class > >>>> loader to include the application jar, then restore the class loader > >>>> after > >>>> the kryo registrator has been run. I don’t know if this would have > any > >>>> other side-effects. > >>>> 3. Always serialise / deserialise on the existing TaskRunner threads, > >>>> rather than delaying serialisation until later, when it can be done > >>>> only if > >>>> needed. This approach would probably have negative performance > >>>> consequences. > >>>> 4. Create a new dedicated thread pool for lazy serialisation / > >>>> deserialisation that has the application jar on the class path. > >>>> Serialisation / deserialisation would be the only thing these threads > >>>> do, > >>>> and this would minimise conflicts / interactions between the > application > >>>> jar and other jars. > >>>> > >>>> #4 sounds like the best approach to me, but I think would require > >>>> considerable knowledge of Spark internals, which is beyond me at > >>>> present. > >>>> Does anyone have any better (and ideally simpler) ideas? > >>>> > >>>> Cheers, > >>>> > >>>> Graham > >>>> > >>> > >>> > >> > > >