Is there a fix that I can test ? I have the flows setup for both standalone and YARN runs...
Thanks. Deb On Thu, Aug 14, 2014 at 10:59 AM, Reynold Xin <r...@databricks.com> wrote: > Yes, I understand it might not work for custom serializer, but that is a > much less common path. > > Basically I want a quick fix for 1.1 release (which is coming up soon). I > would not be comfortable making big changes to class path late into the > release cycle. We can do that for 1.2. > > > > > > On Thu, Aug 14, 2014 at 2:35 AM, Graham Dennis <graham.den...@gmail.com> > wrote: > >> That should work, but would you also make these changes to the >> JavaSerializer? The API of these is the same so that you can select one or >> the other (or in theory a custom serializer)? This also wouldn't address >> the problem of shipping custom *serializers* (not kryo registrators) in >> user jars. >> >> On 14 August 2014 19:23, Reynold Xin <r...@databricks.com> wrote: >> >>> Graham, >>> >>> SparkEnv only creates a KryoSerializer, but as I understand that >>> serializer doesn't actually initializes the registrator since that is only >>> called when newKryo() is called when KryoSerializerInstance is initialized. >>> >>> Basically I'm thinking a quick fix for 1.2: >>> >>> 1. Add a classLoader field to KryoSerializer; initialize new >>> KryoSerializerInstance with that class loader >>> >>> 2. Set that classLoader to the executor's class loader when Executor is >>> initialized. >>> >>> Then all deser calls should be using the executor's class loader. >>> >>> >>> >>> >>> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <graham.den...@gmail.com >>> > wrote: >>> >>>> Hi Reynold, >>>> >>>> That would solve this specific issue, but you'd need to be careful that >>>> you never created a serialiser instance before the first task is received. >>>> Currently in Executor.TaskRunner.run a closure serialiser instance is >>>> created before any application jars are downloaded, but that could be >>>> moved. To me, this seems a little fragile. >>>> >>>> However there is a related issue where you can't ship a custom >>>> serialiser in an application jar because the serialiser is instantiated >>>> when the SparkEnv object is created, which is before any tasks are received >>>> by the executor. The above approach wouldn't help with this problem. >>>> Additionally, the YARN scheduler currently uses this approach of adding >>>> the application jar to the Executor classpath, so it would make things a >>>> bit more uniform. >>>> >>>> Cheers, >>>> Graham >>>> >>>> >>>> On 14 August 2014 17:37, Reynold Xin <r...@databricks.com> wrote: >>>> >>>>> Graham, >>>>> >>>>> Thanks for working on this. This is an important bug to fix. >>>>> >>>>> I don't have the whole context and obviously I haven't spent nearly >>>>> as much time on this as you have, but I'm wondering what if we always pass >>>>> the executor's ClassLoader to the Kryo serializer? Will that solve this >>>>> problem? >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis < >>>>> graham.den...@gmail.com> wrote: >>>>> >>>>>> Hi Deb, >>>>>> >>>>>> The only alternative serialiser is the JavaSerialiser (the default). >>>>>> Theoretically Spark supports custom serialisers, but due to a related >>>>>> issue, custom serialisers currently can't live in application jars and >>>>>> must >>>>>> be available to all executors at launch. My PR fixes this issue as well, >>>>>> allowing custom serialisers to be shipped in application jars. >>>>>> >>>>>> Graham >>>>>> >>>>>> >>>>>> On 14 August 2014 16:56, Debasish Das <debasish.da...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Sorry I just saw Graham's email after sending my previous email >>>>>>> about this bug... >>>>>>> >>>>>>> I have been seeing this same issue on our ALS runs last week but I >>>>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0... >>>>>>> >>>>>>> What's the status of this PR ? Will this fix be back-ported to 1.0.1 >>>>>>> as we are running 1.0.1 stable standalone cluster ? >>>>>>> >>>>>>> Till the PR merges does it make sense to not use Kryo ? What are the >>>>>>> other recommended efficient serializers ? >>>>>>> >>>>>>> Thanks. >>>>>>> Deb >>>>>>> >>>>>>> >>>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis < >>>>>>> graham.den...@gmail.com> wrote: >>>>>>> >>>>>>>> I now have a complete pull request for this issue that I'd like to >>>>>>>> get >>>>>>>> reviewed and committed. The PR is available here: >>>>>>>> https://github.com/apache/spark/pull/1890 and includes a testcase >>>>>>>> for the >>>>>>>> issue I described. I've also submitted a related PR ( >>>>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions >>>>>>>> raised >>>>>>>> while attempting to run the custom kryo registrator not to be >>>>>>>> swallowed. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Graham >>>>>>>> >>>>>>>> >>>>>>>> On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> > I've submitted a work-in-progress pull request for this issue >>>>>>>> that I'd >>>>>>>> > like feedback on. See https://github.com/apache/spark/pull/1890 >>>>>>>> . I've >>>>>>>> > also submitted a pull request for the related issue that the >>>>>>>> exceptions hit >>>>>>>> > when trying to use a custom kryo registrator are being swallowed: >>>>>>>> > https://github.com/apache/spark/pull/1827 >>>>>>>> > >>>>>>>> > The approach in my pull request is to get the Worker processes to >>>>>>>> download >>>>>>>> > the application jars and add them to the Executor class path at >>>>>>>> launch >>>>>>>> > time. There are a couple of things that still need to be done >>>>>>>> before this >>>>>>>> > can be merged: >>>>>>>> > 1. At the moment, the first time a task runs in the executor, the >>>>>>>> > application jars are downloaded again. My solution here would be >>>>>>>> to make >>>>>>>> > the executor not download any jars that already exist. >>>>>>>> Previously, the >>>>>>>> > driver & executor kept track of the timestamp of jar files and >>>>>>>> would >>>>>>>> > redownload 'updated' jars, however this never made sense as the >>>>>>>> previous >>>>>>>> > version of the updated jar may have already been loaded into the >>>>>>>> executor, >>>>>>>> > so the updated jar may have no effect. As my current pull >>>>>>>> request removes >>>>>>>> > the timestamp for jars, just checking whether the jar exists will >>>>>>>> allow us >>>>>>>> > to avoid downloading the jars again. >>>>>>>> > 2. Tests. :-) >>>>>>>> > >>>>>>>> > A side-benefit of my pull request is that you will be able to use >>>>>>>> custom >>>>>>>> > serialisers that are distributed in a user jar. Currently, the >>>>>>>> serialiser >>>>>>>> > instance is created in the Executor process before the first task >>>>>>>> is >>>>>>>> > received and therefore before any user jars are downloaded. As >>>>>>>> this PR >>>>>>>> > adds user jars to the Executor process at launch time, this won't >>>>>>>> be an >>>>>>>> > issue. >>>>>>>> > >>>>>>>> > >>>>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com> >>>>>>>> wrote: >>>>>>>> > >>>>>>>> >> See my comment on >>>>>>>> https://issues.apache.org/jira/browse/SPARK-2878 for >>>>>>>> >> the full stacktrace, but it's in the >>>>>>>> BlockManager/BlockManagerWorker where >>>>>>>> >> it's trying to fulfil a "getBlock" request for another node. >>>>>>>> The objects >>>>>>>> >> that would be in the block haven't yet been serialised, and that >>>>>>>> then >>>>>>>> >> causes the deserialisation to happen on that thread. See >>>>>>>> >> MemoryStore.scala:102. >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote: >>>>>>>> >> >>>>>>>> >>> I don't think it was a conscious design decision to not include >>>>>>>> the >>>>>>>> >>> application classes in the connection manager serializer. We >>>>>>>> should fix >>>>>>>> >>> that. Where is it deserializing data in that thread? >>>>>>>> >>> >>>>>>>> >>> 4 might make sense in the long run, but it adds a lot of >>>>>>>> complexity to >>>>>>>> >>> the code base (whole separate code base, task queue, >>>>>>>> blocking/non-blocking >>>>>>>> >>> logic within task threads) that can be error prone, so I think >>>>>>>> it is best >>>>>>>> >>> to stay away from that right now. >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis < >>>>>>>> graham.den...@gmail.com> >>>>>>>> >>> wrote: >>>>>>>> >>> >>>>>>>> >>>> Hi Spark devs, >>>>>>>> >>>> >>>>>>>> >>>> I’ve posted an issue on JIRA ( >>>>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878) which >>>>>>>> occurs when >>>>>>>> >>>> using >>>>>>>> >>>> Kryo serialisation with a custom Kryo registrator to register >>>>>>>> custom >>>>>>>> >>>> classes with Kryo. This is an insidious issue that >>>>>>>> >>>> non-deterministically >>>>>>>> >>>> causes Kryo to have different ID number => class name maps on >>>>>>>> different >>>>>>>> >>>> nodes, which then causes weird exceptions (ClassCastException, >>>>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at >>>>>>>> >>>> deserialisation >>>>>>>> >>>> time. I’ve created a reliable reproduction for the issue here: >>>>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation >>>>>>>> >>>> >>>>>>>> >>>> I’m happy to try and put a pull request together to try and >>>>>>>> address >>>>>>>> >>>> this, >>>>>>>> >>>> but it’s not obvious to me the right way to solve this and I’d >>>>>>>> like to >>>>>>>> >>>> get >>>>>>>> >>>> feedback / ideas on how to address this. >>>>>>>> >>>> >>>>>>>> >>>> The root cause of the problem is a "Failed to run >>>>>>>> >>>> spark.kryo.registrator” >>>>>>>> >>>> error which non-deterministically occurs in some executor >>>>>>>> processes >>>>>>>> >>>> during >>>>>>>> >>>> operation. My custom Kryo registrator is in the application >>>>>>>> jar, and >>>>>>>> >>>> it is >>>>>>>> >>>> accessible on the worker nodes. This is demonstrated by the >>>>>>>> fact that >>>>>>>> >>>> most >>>>>>>> >>>> of the time the custom kryo registrator is successfully run. >>>>>>>> >>>> >>>>>>>> >>>> What’s happening is that Kryo serialisation/deserialisation is >>>>>>>> happening >>>>>>>> >>>> most of the time on an “Executor task launch worker” thread, >>>>>>>> which has >>>>>>>> >>>> the >>>>>>>> >>>> thread's class loader set to contain the application jar. >>>>>>>> This happens >>>>>>>> >>>> in >>>>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from >>>>>>>> what I can >>>>>>>> >>>> tell, it is only these threads that have access to the >>>>>>>> application jar >>>>>>>> >>>> (that contains the custom Kryo registrator). However, the >>>>>>>> >>>> ConnectionManager threads sometimes need to >>>>>>>> serialise/deserialise >>>>>>>> >>>> objects >>>>>>>> >>>> to satisfy “getBlock” requests when the objects haven’t >>>>>>>> previously been >>>>>>>> >>>> serialised. As the ConnectionManager threads don’t have the >>>>>>>> application >>>>>>>> >>>> jar available from their class loader, when it tries to look >>>>>>>> up the >>>>>>>> >>>> custom >>>>>>>> >>>> Kryo registrator, this fails. Spark then swallows this >>>>>>>> exception, which >>>>>>>> >>>> results in a different ID number —> class mapping for this kryo >>>>>>>> >>>> instance, >>>>>>>> >>>> and this then causes deserialisation errors later on a >>>>>>>> different node. >>>>>>>> >>>> >>>>>>>> >>>> A related issue to the issue reported in SPARK-2878 is that >>>>>>>> Spark >>>>>>>> >>>> probably >>>>>>>> >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo >>>>>>>> >>>> registrators. >>>>>>>> >>>> The user has explicitly specified this class, and if it >>>>>>>> >>>> deterministically >>>>>>>> >>>> can’t be found, then it may cause problems at serialisation / >>>>>>>> >>>> deserialisation time. If only sometimes it can’t be found (as >>>>>>>> in this >>>>>>>> >>>> case), then it leads to a data corruption issue later on. >>>>>>>> Either way, >>>>>>>> >>>> we’re better off dying due to the ClassNotFound exception >>>>>>>> earlier, than >>>>>>>> >>>> the >>>>>>>> >>>> weirder errors later on. >>>>>>>> >>>> >>>>>>>> >>>> I have some ideas on potential solutions to this issue, but >>>>>>>> I’m keen for >>>>>>>> >>>> experienced eyes to critique these approaches: >>>>>>>> >>>> >>>>>>>> >>>> 1. The simplest approach to fixing this would be to just make >>>>>>>> the >>>>>>>> >>>> application jar available to the connection manager threads, >>>>>>>> but I’m >>>>>>>> >>>> guessing it’s a design decision to isolate the application jar >>>>>>>> to just >>>>>>>> >>>> the >>>>>>>> >>>> executor task runner threads. Also, I don’t know if there are >>>>>>>> any other >>>>>>>> >>>> threads that might be interacting with kryo serialisation / >>>>>>>> >>>> deserialisation. >>>>>>>> >>>> 2. Before looking up the custom Kryo registrator, change the >>>>>>>> thread’s >>>>>>>> >>>> class >>>>>>>> >>>> loader to include the application jar, then restore the class >>>>>>>> loader >>>>>>>> >>>> after >>>>>>>> >>>> the kryo registrator has been run. I don’t know if this would >>>>>>>> have any >>>>>>>> >>>> other side-effects. >>>>>>>> >>>> 3. Always serialise / deserialise on the existing TaskRunner >>>>>>>> threads, >>>>>>>> >>>> rather than delaying serialisation until later, when it can be >>>>>>>> done >>>>>>>> >>>> only if >>>>>>>> >>>> needed. This approach would probably have negative performance >>>>>>>> >>>> consequences. >>>>>>>> >>>> 4. Create a new dedicated thread pool for lazy serialisation / >>>>>>>> >>>> deserialisation that has the application jar on the class path. >>>>>>>> >>>> Serialisation / deserialisation would be the only thing these >>>>>>>> threads >>>>>>>> >>>> do, >>>>>>>> >>>> and this would minimise conflicts / interactions between the >>>>>>>> application >>>>>>>> >>>> jar and other jars. >>>>>>>> >>>> >>>>>>>> >>>> #4 sounds like the best approach to me, but I think would >>>>>>>> require >>>>>>>> >>>> considerable knowledge of Spark internals, which is beyond me >>>>>>>> at >>>>>>>> >>>> present. >>>>>>>> >>>> Does anyone have any better (and ideally simpler) ideas? >>>>>>>> >>>> >>>>>>>> >>>> Cheers, >>>>>>>> >>>> >>>>>>>> >>>> Graham >>>>>>>> >>>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >> >>>>>>>> > >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >