By the way I have seen this same problem while deploying 1.1.0-SNAPSHOT on YARN as well...
So it is a common problem in both standalone and YARN mode deployment... On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <graham.den...@gmail.com> wrote: > Hi Reynold, > > That would solve this specific issue, but you'd need to be careful that > you never created a serialiser instance before the first task is received. > Currently in Executor.TaskRunner.run a closure serialiser instance is > created before any application jars are downloaded, but that could be > moved. To me, this seems a little fragile. > > However there is a related issue where you can't ship a custom serialiser > in an application jar because the serialiser is instantiated when the > SparkEnv object is created, which is before any tasks are received by the > executor. The above approach wouldn't help with this problem. > Additionally, the YARN scheduler currently uses this approach of adding > the application jar to the Executor classpath, so it would make things a > bit more uniform. > > Cheers, > Graham > > > On 14 August 2014 17:37, Reynold Xin <r...@databricks.com> wrote: > >> Graham, >> >> Thanks for working on this. This is an important bug to fix. >> >> I don't have the whole context and obviously I haven't spent nearly as >> much time on this as you have, but I'm wondering what if we always pass the >> executor's ClassLoader to the Kryo serializer? Will that solve this problem? >> >> >> >> >> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis <graham.den...@gmail.com> >> wrote: >> >>> Hi Deb, >>> >>> The only alternative serialiser is the JavaSerialiser (the default). >>> Theoretically Spark supports custom serialisers, but due to a related >>> issue, custom serialisers currently can't live in application jars and must >>> be available to all executors at launch. My PR fixes this issue as well, >>> allowing custom serialisers to be shipped in application jars. >>> >>> Graham >>> >>> >>> On 14 August 2014 16:56, Debasish Das <debasish.da...@gmail.com> wrote: >>> >>>> Sorry I just saw Graham's email after sending my previous email about >>>> this bug... >>>> >>>> I have been seeing this same issue on our ALS runs last week but I >>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0... >>>> >>>> What's the status of this PR ? Will this fix be back-ported to 1.0.1 as >>>> we are running 1.0.1 stable standalone cluster ? >>>> >>>> Till the PR merges does it make sense to not use Kryo ? What are the >>>> other recommended efficient serializers ? >>>> >>>> Thanks. >>>> Deb >>>> >>>> >>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <graham.den...@gmail.com >>>> > wrote: >>>> >>>>> I now have a complete pull request for this issue that I'd like to get >>>>> reviewed and committed. The PR is available here: >>>>> https://github.com/apache/spark/pull/1890 and includes a testcase for >>>>> the >>>>> issue I described. I've also submitted a related PR ( >>>>> https://github.com/apache/spark/pull/1827) that causes exceptions >>>>> raised >>>>> while attempting to run the custom kryo registrator not to be >>>>> swallowed. >>>>> >>>>> Thanks, >>>>> Graham >>>>> >>>>> >>>>> On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com> >>>>> wrote: >>>>> >>>>> > I've submitted a work-in-progress pull request for this issue that >>>>> I'd >>>>> > like feedback on. See https://github.com/apache/spark/pull/1890 . >>>>> I've >>>>> > also submitted a pull request for the related issue that the >>>>> exceptions hit >>>>> > when trying to use a custom kryo registrator are being swallowed: >>>>> > https://github.com/apache/spark/pull/1827 >>>>> > >>>>> > The approach in my pull request is to get the Worker processes to >>>>> download >>>>> > the application jars and add them to the Executor class path at >>>>> launch >>>>> > time. There are a couple of things that still need to be done before >>>>> this >>>>> > can be merged: >>>>> > 1. At the moment, the first time a task runs in the executor, the >>>>> > application jars are downloaded again. My solution here would be to >>>>> make >>>>> > the executor not download any jars that already exist. Previously, >>>>> the >>>>> > driver & executor kept track of the timestamp of jar files and would >>>>> > redownload 'updated' jars, however this never made sense as the >>>>> previous >>>>> > version of the updated jar may have already been loaded into the >>>>> executor, >>>>> > so the updated jar may have no effect. As my current pull request >>>>> removes >>>>> > the timestamp for jars, just checking whether the jar exists will >>>>> allow us >>>>> > to avoid downloading the jars again. >>>>> > 2. Tests. :-) >>>>> > >>>>> > A side-benefit of my pull request is that you will be able to use >>>>> custom >>>>> > serialisers that are distributed in a user jar. Currently, the >>>>> serialiser >>>>> > instance is created in the Executor process before the first task is >>>>> > received and therefore before any user jars are downloaded. As this >>>>> PR >>>>> > adds user jars to the Executor process at launch time, this won't be >>>>> an >>>>> > issue. >>>>> > >>>>> > >>>>> > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com> >>>>> wrote: >>>>> > >>>>> >> See my comment on https://issues.apache.org/jira/browse/SPARK-2878 >>>>> for >>>>> >> the full stacktrace, but it's in the >>>>> BlockManager/BlockManagerWorker where >>>>> >> it's trying to fulfil a "getBlock" request for another node. The >>>>> objects >>>>> >> that would be in the block haven't yet been serialised, and that >>>>> then >>>>> >> causes the deserialisation to happen on that thread. See >>>>> >> MemoryStore.scala:102. >>>>> >> >>>>> >> >>>>> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote: >>>>> >> >>>>> >>> I don't think it was a conscious design decision to not include the >>>>> >>> application classes in the connection manager serializer. We >>>>> should fix >>>>> >>> that. Where is it deserializing data in that thread? >>>>> >>> >>>>> >>> 4 might make sense in the long run, but it adds a lot of >>>>> complexity to >>>>> >>> the code base (whole separate code base, task queue, >>>>> blocking/non-blocking >>>>> >>> logic within task threads) that can be error prone, so I think it >>>>> is best >>>>> >>> to stay away from that right now. >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis < >>>>> graham.den...@gmail.com> >>>>> >>> wrote: >>>>> >>> >>>>> >>>> Hi Spark devs, >>>>> >>>> >>>>> >>>> I’ve posted an issue on JIRA ( >>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs >>>>> when >>>>> >>>> using >>>>> >>>> Kryo serialisation with a custom Kryo registrator to register >>>>> custom >>>>> >>>> classes with Kryo. This is an insidious issue that >>>>> >>>> non-deterministically >>>>> >>>> causes Kryo to have different ID number => class name maps on >>>>> different >>>>> >>>> nodes, which then causes weird exceptions (ClassCastException, >>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at >>>>> >>>> deserialisation >>>>> >>>> time. I’ve created a reliable reproduction for the issue here: >>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation >>>>> >>>> >>>>> >>>> I’m happy to try and put a pull request together to try and >>>>> address >>>>> >>>> this, >>>>> >>>> but it’s not obvious to me the right way to solve this and I’d >>>>> like to >>>>> >>>> get >>>>> >>>> feedback / ideas on how to address this. >>>>> >>>> >>>>> >>>> The root cause of the problem is a "Failed to run >>>>> >>>> spark.kryo.registrator” >>>>> >>>> error which non-deterministically occurs in some executor >>>>> processes >>>>> >>>> during >>>>> >>>> operation. My custom Kryo registrator is in the application jar, >>>>> and >>>>> >>>> it is >>>>> >>>> accessible on the worker nodes. This is demonstrated by the fact >>>>> that >>>>> >>>> most >>>>> >>>> of the time the custom kryo registrator is successfully run. >>>>> >>>> >>>>> >>>> What’s happening is that Kryo serialisation/deserialisation is >>>>> happening >>>>> >>>> most of the time on an “Executor task launch worker” thread, >>>>> which has >>>>> >>>> the >>>>> >>>> thread's class loader set to contain the application jar. This >>>>> happens >>>>> >>>> in >>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from >>>>> what I can >>>>> >>>> tell, it is only these threads that have access to the >>>>> application jar >>>>> >>>> (that contains the custom Kryo registrator). However, the >>>>> >>>> ConnectionManager threads sometimes need to serialise/deserialise >>>>> >>>> objects >>>>> >>>> to satisfy “getBlock” requests when the objects haven’t >>>>> previously been >>>>> >>>> serialised. As the ConnectionManager threads don’t have the >>>>> application >>>>> >>>> jar available from their class loader, when it tries to look up >>>>> the >>>>> >>>> custom >>>>> >>>> Kryo registrator, this fails. Spark then swallows this >>>>> exception, which >>>>> >>>> results in a different ID number —> class mapping for this kryo >>>>> >>>> instance, >>>>> >>>> and this then causes deserialisation errors later on a different >>>>> node. >>>>> >>>> >>>>> >>>> A related issue to the issue reported in SPARK-2878 is that Spark >>>>> >>>> probably >>>>> >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo >>>>> >>>> registrators. >>>>> >>>> The user has explicitly specified this class, and if it >>>>> >>>> deterministically >>>>> >>>> can’t be found, then it may cause problems at serialisation / >>>>> >>>> deserialisation time. If only sometimes it can’t be found (as in >>>>> this >>>>> >>>> case), then it leads to a data corruption issue later on. Either >>>>> way, >>>>> >>>> we’re better off dying due to the ClassNotFound exception >>>>> earlier, than >>>>> >>>> the >>>>> >>>> weirder errors later on. >>>>> >>>> >>>>> >>>> I have some ideas on potential solutions to this issue, but I’m >>>>> keen for >>>>> >>>> experienced eyes to critique these approaches: >>>>> >>>> >>>>> >>>> 1. The simplest approach to fixing this would be to just make the >>>>> >>>> application jar available to the connection manager threads, but >>>>> I’m >>>>> >>>> guessing it’s a design decision to isolate the application jar to >>>>> just >>>>> >>>> the >>>>> >>>> executor task runner threads. Also, I don’t know if there are >>>>> any other >>>>> >>>> threads that might be interacting with kryo serialisation / >>>>> >>>> deserialisation. >>>>> >>>> 2. Before looking up the custom Kryo registrator, change the >>>>> thread’s >>>>> >>>> class >>>>> >>>> loader to include the application jar, then restore the class >>>>> loader >>>>> >>>> after >>>>> >>>> the kryo registrator has been run. I don’t know if this would >>>>> have any >>>>> >>>> other side-effects. >>>>> >>>> 3. Always serialise / deserialise on the existing TaskRunner >>>>> threads, >>>>> >>>> rather than delaying serialisation until later, when it can be >>>>> done >>>>> >>>> only if >>>>> >>>> needed. This approach would probably have negative performance >>>>> >>>> consequences. >>>>> >>>> 4. Create a new dedicated thread pool for lazy serialisation / >>>>> >>>> deserialisation that has the application jar on the class path. >>>>> >>>> Serialisation / deserialisation would be the only thing these >>>>> threads >>>>> >>>> do, >>>>> >>>> and this would minimise conflicts / interactions between the >>>>> application >>>>> >>>> jar and other jars. >>>>> >>>> >>>>> >>>> #4 sounds like the best approach to me, but I think would require >>>>> >>>> considerable knowledge of Spark internals, which is beyond me at >>>>> >>>> present. >>>>> >>>> Does anyone have any better (and ideally simpler) ideas? >>>>> >>>> >>>>> >>>> Cheers, >>>>> >>>> >>>>> >>>> Graham >>>>> >>>> >>>>> >>> >>>>> >>> >>>>> >> >>>>> > >>>>> >>>> >>>> >>> >> >