In part, my assertion was based on a comment by sryza on my PR ( https://github.com/apache/spark/pull/1890#issuecomment-51805750), however I thought I had also seen it in the YARN code base. However, now that I look for it, I can't find where this happens, so perhaps I was imagining the YARN behaviour.
On 14 August 2014 17:57, Debasish Das <debasish.da...@gmail.com> wrote: > By the way I have seen this same problem while deploying 1.1.0-SNAPSHOT on > YARN as well... > > So it is a common problem in both standalone and YARN mode deployment... > > > On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <graham.den...@gmail.com> > wrote: > >> Hi Reynold, >> >> That would solve this specific issue, but you'd need to be careful that >> you never created a serialiser instance before the first task is received. >> Currently in Executor.TaskRunner.run a closure serialiser instance is >> created before any application jars are downloaded, but that could be >> moved. To me, this seems a little fragile. >> >> However there is a related issue where you can't ship a custom serialiser >> in an application jar because the serialiser is instantiated when the >> SparkEnv object is created, which is before any tasks are received by the >> executor. The above approach wouldn't help with this problem. >> Additionally, the YARN scheduler currently uses this approach of adding >> the application jar to the Executor classpath, so it would make things a >> bit more uniform. >> >> Cheers, >> Graham >> >> >> On 14 August 2014 17:37, Reynold Xin <r...@databricks.com> wrote: >> >>> Graham, >>> >>> Thanks for working on this. This is an important bug to fix. >>> >>> I don't have the whole context and obviously I haven't spent nearly as >>> much time on this as you have, but I'm wondering what if we always pass the >>> executor's ClassLoader to the Kryo serializer? Will that solve this problem? >>> >>> >>> >>> >>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis <graham.den...@gmail.com >>> > wrote: >>> >>>> Hi Deb, >>>> >>>> The only alternative serialiser is the JavaSerialiser (the default). >>>> Theoretically Spark supports custom serialisers, but due to a related >>>> issue, custom serialisers currently can't live in application jars and must >>>> be available to all executors at launch. My PR fixes this issue as well, >>>> allowing custom serialisers to be shipped in application jars. >>>> >>>> Graham >>>> >>>> >>>> On 14 August 2014 16:56, Debasish Das <debasish.da...@gmail.com> wrote: >>>> >>>>> Sorry I just saw Graham's email after sending my previous email about >>>>> this bug... >>>>> >>>>> I have been seeing this same issue on our ALS runs last week but I >>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0... >>>>> >>>>> What's the status of this PR ? Will this fix be back-ported to 1.0.1 >>>>> as we are running 1.0.1 stable standalone cluster ? >>>>> >>>>> Till the PR merges does it make sense to not use Kryo ? What are the >>>>> other recommended efficient serializers ? >>>>> >>>>> Thanks. >>>>> Deb >>>>> >>>>> >>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis < >>>>> graham.den...@gmail.com> wrote: >>>>> >>>>>> I now have a complete pull request for this issue that I'd like to get >>>>>> reviewed and committed. The PR is available here: >>>>>> https://github.com/apache/spark/pull/1890 and includes a testcase >>>>>> for the >>>>>> issue I described. I've also submitted a related PR ( >>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions >>>>>> raised >>>>>> while attempting to run the custom kryo registrator not to be >>>>>> swallowed. >>>>>> >>>>>> Thanks, >>>>>> Graham >>>>>> >>>>>> >>>>>> On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> > I've submitted a work-in-progress pull request for this issue that >>>>>> I'd >>>>>> > like feedback on. See https://github.com/apache/spark/pull/1890 . >>>>>> I've >>>>>> > also submitted a pull request for the related issue that the >>>>>> exceptions hit >>>>>> > when trying to use a custom kryo registrator are being swallowed: >>>>>> > https://github.com/apache/spark/pull/1827 >>>>>> > >>>>>> > The approach in my pull request is to get the Worker processes to >>>>>> download >>>>>> > the application jars and add them to the Executor class path at >>>>>> launch >>>>>> > time. There are a couple of things that still need to be done >>>>>> before this >>>>>> > can be merged: >>>>>> > 1. At the moment, the first time a task runs in the executor, the >>>>>> > application jars are downloaded again. My solution here would be >>>>>> to make >>>>>> > the executor not download any jars that already exist. Previously, >>>>>> the >>>>>> > driver & executor kept track of the timestamp of jar files and would >>>>>> > redownload 'updated' jars, however this never made sense as the >>>>>> previous >>>>>> > version of the updated jar may have already been loaded into the >>>>>> executor, >>>>>> > so the updated jar may have no effect. As my current pull request >>>>>> removes >>>>>> > the timestamp for jars, just checking whether the jar exists will >>>>>> allow us >>>>>> > to avoid downloading the jars again. >>>>>> > 2. Tests. :-) >>>>>> > >>>>>> > A side-benefit of my pull request is that you will be able to use >>>>>> custom >>>>>> > serialisers that are distributed in a user jar. Currently, the >>>>>> serialiser >>>>>> > instance is created in the Executor process before the first task is >>>>>> > received and therefore before any user jars are downloaded. As >>>>>> this PR >>>>>> > adds user jars to the Executor process at launch time, this won't >>>>>> be an >>>>>> > issue. >>>>>> > >>>>>> > >>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com> >>>>>> wrote: >>>>>> > >>>>>> >> See my comment on https://issues.apache.org/jira/browse/SPARK-2878 >>>>>> for >>>>>> >> the full stacktrace, but it's in the >>>>>> BlockManager/BlockManagerWorker where >>>>>> >> it's trying to fulfil a "getBlock" request for another node. The >>>>>> objects >>>>>> >> that would be in the block haven't yet been serialised, and that >>>>>> then >>>>>> >> causes the deserialisation to happen on that thread. See >>>>>> >> MemoryStore.scala:102. >>>>>> >> >>>>>> >> >>>>>> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote: >>>>>> >> >>>>>> >>> I don't think it was a conscious design decision to not include >>>>>> the >>>>>> >>> application classes in the connection manager serializer. We >>>>>> should fix >>>>>> >>> that. Where is it deserializing data in that thread? >>>>>> >>> >>>>>> >>> 4 might make sense in the long run, but it adds a lot of >>>>>> complexity to >>>>>> >>> the code base (whole separate code base, task queue, >>>>>> blocking/non-blocking >>>>>> >>> logic within task threads) that can be error prone, so I think it >>>>>> is best >>>>>> >>> to stay away from that right now. >>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis < >>>>>> graham.den...@gmail.com> >>>>>> >>> wrote: >>>>>> >>> >>>>>> >>>> Hi Spark devs, >>>>>> >>>> >>>>>> >>>> I’ve posted an issue on JIRA ( >>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs >>>>>> when >>>>>> >>>> using >>>>>> >>>> Kryo serialisation with a custom Kryo registrator to register >>>>>> custom >>>>>> >>>> classes with Kryo. This is an insidious issue that >>>>>> >>>> non-deterministically >>>>>> >>>> causes Kryo to have different ID number => class name maps on >>>>>> different >>>>>> >>>> nodes, which then causes weird exceptions (ClassCastException, >>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at >>>>>> >>>> deserialisation >>>>>> >>>> time. I’ve created a reliable reproduction for the issue here: >>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation >>>>>> >>>> >>>>>> >>>> I’m happy to try and put a pull request together to try and >>>>>> address >>>>>> >>>> this, >>>>>> >>>> but it’s not obvious to me the right way to solve this and I’d >>>>>> like to >>>>>> >>>> get >>>>>> >>>> feedback / ideas on how to address this. >>>>>> >>>> >>>>>> >>>> The root cause of the problem is a "Failed to run >>>>>> >>>> spark.kryo.registrator” >>>>>> >>>> error which non-deterministically occurs in some executor >>>>>> processes >>>>>> >>>> during >>>>>> >>>> operation. My custom Kryo registrator is in the application >>>>>> jar, and >>>>>> >>>> it is >>>>>> >>>> accessible on the worker nodes. This is demonstrated by the >>>>>> fact that >>>>>> >>>> most >>>>>> >>>> of the time the custom kryo registrator is successfully run. >>>>>> >>>> >>>>>> >>>> What’s happening is that Kryo serialisation/deserialisation is >>>>>> happening >>>>>> >>>> most of the time on an “Executor task launch worker” thread, >>>>>> which has >>>>>> >>>> the >>>>>> >>>> thread's class loader set to contain the application jar. This >>>>>> happens >>>>>> >>>> in >>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from >>>>>> what I can >>>>>> >>>> tell, it is only these threads that have access to the >>>>>> application jar >>>>>> >>>> (that contains the custom Kryo registrator). However, the >>>>>> >>>> ConnectionManager threads sometimes need to serialise/deserialise >>>>>> >>>> objects >>>>>> >>>> to satisfy “getBlock” requests when the objects haven’t >>>>>> previously been >>>>>> >>>> serialised. As the ConnectionManager threads don’t have the >>>>>> application >>>>>> >>>> jar available from their class loader, when it tries to look up >>>>>> the >>>>>> >>>> custom >>>>>> >>>> Kryo registrator, this fails. Spark then swallows this >>>>>> exception, which >>>>>> >>>> results in a different ID number —> class mapping for this kryo >>>>>> >>>> instance, >>>>>> >>>> and this then causes deserialisation errors later on a different >>>>>> node. >>>>>> >>>> >>>>>> >>>> A related issue to the issue reported in SPARK-2878 is that Spark >>>>>> >>>> probably >>>>>> >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo >>>>>> >>>> registrators. >>>>>> >>>> The user has explicitly specified this class, and if it >>>>>> >>>> deterministically >>>>>> >>>> can’t be found, then it may cause problems at serialisation / >>>>>> >>>> deserialisation time. If only sometimes it can’t be found (as >>>>>> in this >>>>>> >>>> case), then it leads to a data corruption issue later on. >>>>>> Either way, >>>>>> >>>> we’re better off dying due to the ClassNotFound exception >>>>>> earlier, than >>>>>> >>>> the >>>>>> >>>> weirder errors later on. >>>>>> >>>> >>>>>> >>>> I have some ideas on potential solutions to this issue, but I’m >>>>>> keen for >>>>>> >>>> experienced eyes to critique these approaches: >>>>>> >>>> >>>>>> >>>> 1. The simplest approach to fixing this would be to just make the >>>>>> >>>> application jar available to the connection manager threads, but >>>>>> I’m >>>>>> >>>> guessing it’s a design decision to isolate the application jar >>>>>> to just >>>>>> >>>> the >>>>>> >>>> executor task runner threads. Also, I don’t know if there are >>>>>> any other >>>>>> >>>> threads that might be interacting with kryo serialisation / >>>>>> >>>> deserialisation. >>>>>> >>>> 2. Before looking up the custom Kryo registrator, change the >>>>>> thread’s >>>>>> >>>> class >>>>>> >>>> loader to include the application jar, then restore the class >>>>>> loader >>>>>> >>>> after >>>>>> >>>> the kryo registrator has been run. I don’t know if this would >>>>>> have any >>>>>> >>>> other side-effects. >>>>>> >>>> 3. Always serialise / deserialise on the existing TaskRunner >>>>>> threads, >>>>>> >>>> rather than delaying serialisation until later, when it can be >>>>>> done >>>>>> >>>> only if >>>>>> >>>> needed. This approach would probably have negative performance >>>>>> >>>> consequences. >>>>>> >>>> 4. Create a new dedicated thread pool for lazy serialisation / >>>>>> >>>> deserialisation that has the application jar on the class path. >>>>>> >>>> Serialisation / deserialisation would be the only thing these >>>>>> threads >>>>>> >>>> do, >>>>>> >>>> and this would minimise conflicts / interactions between the >>>>>> application >>>>>> >>>> jar and other jars. >>>>>> >>>> >>>>>> >>>> #4 sounds like the best approach to me, but I think would require >>>>>> >>>> considerable knowledge of Spark internals, which is beyond me at >>>>>> >>>> present. >>>>>> >>>> Does anyone have any better (and ideally simpler) ideas? >>>>>> >>>> >>>>>> >>>> Cheers, >>>>>> >>>> >>>>>> >>>> Graham >>>>>> >>>> >>>>>> >>> >>>>>> >>> >>>>>> >> >>>>>> > >>>>>> >>>>> >>>>> >>>> >>> >> >