In part, my assertion was based on a comment by sryza on my PR (
https://github.com/apache/spark/pull/1890#issuecomment-51805750), however I
thought I had also seen it in the YARN code base.  However, now that I look
for it, I can't find where this happens, so perhaps I was imagining the
YARN behaviour.


On 14 August 2014 17:57, Debasish Das <debasish.da...@gmail.com> wrote:

> By the way I have seen this same problem while deploying 1.1.0-SNAPSHOT on
> YARN as well...
>
> So it is a common problem in both standalone and YARN mode deployment...
>
>
> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <graham.den...@gmail.com>
> wrote:
>
>> Hi Reynold,
>>
>> That would solve this specific issue, but you'd need to be careful that
>> you never created a serialiser instance before the first task is received.
>>  Currently in Executor.TaskRunner.run a closure serialiser instance is
>> created before any application jars are downloaded, but that could be
>> moved.  To me, this seems a little fragile.
>>
>> However there is a related issue where you can't ship a custom serialiser
>> in an application jar because the serialiser is instantiated when the
>> SparkEnv object is created, which is before any tasks are received by the
>> executor.  The above approach wouldn't help with this problem.
>>  Additionally, the YARN scheduler currently uses this approach of adding
>> the application jar to the Executor classpath, so it would make things a
>> bit more uniform.
>>
>> Cheers,
>> Graham
>>
>>
>> On 14 August 2014 17:37, Reynold Xin <r...@databricks.com> wrote:
>>
>>> Graham,
>>>
>>> Thanks for working on this. This is an important bug to fix.
>>>
>>>  I don't have the whole context and obviously I haven't spent nearly as
>>> much time on this as you have, but I'm wondering what if we always pass the
>>> executor's ClassLoader to the Kryo serializer? Will that solve this problem?
>>>
>>>
>>>
>>>
>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis <graham.den...@gmail.com
>>> > wrote:
>>>
>>>> Hi Deb,
>>>>
>>>> The only alternative serialiser is the JavaSerialiser (the default).
>>>>  Theoretically Spark supports custom serialisers, but due to a related
>>>> issue, custom serialisers currently can't live in application jars and must
>>>> be available to all executors at launch.  My PR fixes this issue as well,
>>>> allowing custom serialisers to be shipped in application jars.
>>>>
>>>> Graham
>>>>
>>>>
>>>> On 14 August 2014 16:56, Debasish Das <debasish.da...@gmail.com> wrote:
>>>>
>>>>> Sorry I just saw Graham's email after sending my previous email about
>>>>> this bug...
>>>>>
>>>>> I have been seeing this same issue on our ALS runs last week but I
>>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0...
>>>>>
>>>>> What's the status of this PR ? Will this fix be back-ported to 1.0.1
>>>>> as we are running 1.0.1 stable standalone cluster ?
>>>>>
>>>>> Till the PR merges does it make sense to not use Kryo ? What are the
>>>>> other recommended efficient serializers ?
>>>>>
>>>>> Thanks.
>>>>> Deb
>>>>>
>>>>>
>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <
>>>>> graham.den...@gmail.com> wrote:
>>>>>
>>>>>> I now have a complete pull request for this issue that I'd like to get
>>>>>> reviewed and committed.  The PR is available here:
>>>>>> https://github.com/apache/spark/pull/1890 and includes a testcase
>>>>>> for the
>>>>>> issue I described.  I've also submitted a related PR (
>>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions
>>>>>> raised
>>>>>> while attempting to run the custom kryo registrator not to be
>>>>>> swallowed.
>>>>>>
>>>>>> Thanks,
>>>>>> Graham
>>>>>>
>>>>>>
>>>>>> On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > I've submitted a work-in-progress pull request for this issue that
>>>>>> I'd
>>>>>> > like feedback on.  See https://github.com/apache/spark/pull/1890 .
>>>>>> I've
>>>>>> > also submitted a pull request for the related issue that the
>>>>>> exceptions hit
>>>>>> > when trying to use a custom kryo registrator are being swallowed:
>>>>>> > https://github.com/apache/spark/pull/1827
>>>>>> >
>>>>>> > The approach in my pull request is to get the Worker processes to
>>>>>> download
>>>>>> > the application jars and add them to the Executor class path at
>>>>>> launch
>>>>>> > time. There are a couple of things that still need to be done
>>>>>> before this
>>>>>> > can be merged:
>>>>>> > 1. At the moment, the first time a task runs in the executor, the
>>>>>> > application jars are downloaded again.  My solution here would be
>>>>>> to make
>>>>>> > the executor not download any jars that already exist.  Previously,
>>>>>> the
>>>>>> > driver & executor kept track of the timestamp of jar files and would
>>>>>> > redownload 'updated' jars, however this never made sense as the
>>>>>> previous
>>>>>> > version of the updated jar may have already been loaded into the
>>>>>> executor,
>>>>>> > so the updated jar may have no effect.  As my current pull request
>>>>>> removes
>>>>>> > the timestamp for jars, just checking whether the jar exists will
>>>>>> allow us
>>>>>> > to avoid downloading the jars again.
>>>>>> > 2. Tests. :-)
>>>>>> >
>>>>>> > A side-benefit of my pull request is that you will be able to use
>>>>>> custom
>>>>>> > serialisers that are distributed in a user jar.  Currently, the
>>>>>> serialiser
>>>>>> > instance is created in the Executor process before the first task is
>>>>>> > received and therefore before any user jars are downloaded.  As
>>>>>> this PR
>>>>>> > adds user jars to the Executor process at launch time, this won't
>>>>>> be an
>>>>>> > issue.
>>>>>> >
>>>>>> >
>>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com>
>>>>>> wrote:
>>>>>> >
>>>>>> >> See my comment on https://issues.apache.org/jira/browse/SPARK-2878
>>>>>> for
>>>>>> >> the full stacktrace, but it's in the
>>>>>> BlockManager/BlockManagerWorker where
>>>>>> >> it's trying to fulfil a "getBlock" request for another node.  The
>>>>>> objects
>>>>>> >> that would be in the block haven't yet been serialised, and that
>>>>>> then
>>>>>> >> causes the deserialisation to happen on that thread.  See
>>>>>> >> MemoryStore.scala:102.
>>>>>> >>
>>>>>> >>
>>>>>> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote:
>>>>>> >>
>>>>>> >>> I don't think it was a conscious design decision to not include
>>>>>> the
>>>>>> >>> application classes in the connection manager serializer. We
>>>>>> should fix
>>>>>> >>> that. Where is it deserializing data in that thread?
>>>>>> >>>
>>>>>> >>>  4 might make sense in the long run, but it adds a lot of
>>>>>> complexity to
>>>>>> >>> the code base (whole separate code base, task queue,
>>>>>> blocking/non-blocking
>>>>>> >>> logic within task threads) that can be error prone, so I think it
>>>>>> is best
>>>>>> >>> to stay away from that right now.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <
>>>>>> graham.den...@gmail.com>
>>>>>> >>> wrote:
>>>>>> >>>
>>>>>> >>>> Hi Spark devs,
>>>>>> >>>>
>>>>>> >>>> I’ve posted an issue on JIRA (
>>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs
>>>>>> when
>>>>>> >>>> using
>>>>>> >>>> Kryo serialisation with a custom Kryo registrator to register
>>>>>> custom
>>>>>> >>>> classes with Kryo.  This is an insidious issue that
>>>>>> >>>> non-deterministically
>>>>>> >>>> causes Kryo to have different ID number => class name maps on
>>>>>> different
>>>>>> >>>> nodes, which then causes weird exceptions (ClassCastException,
>>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at
>>>>>> >>>> deserialisation
>>>>>> >>>> time.  I’ve created a reliable reproduction for the issue here:
>>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>>>> >>>>
>>>>>> >>>> I’m happy to try and put a pull request together to try and
>>>>>> address
>>>>>> >>>> this,
>>>>>> >>>> but it’s not obvious to me the right way to solve this and I’d
>>>>>> like to
>>>>>> >>>> get
>>>>>> >>>> feedback / ideas on how to address this.
>>>>>> >>>>
>>>>>> >>>> The root cause of the problem is a "Failed to run
>>>>>> >>>> spark.kryo.registrator”
>>>>>> >>>> error which non-deterministically occurs in some executor
>>>>>> processes
>>>>>> >>>> during
>>>>>> >>>> operation.  My custom Kryo registrator is in the application
>>>>>> jar, and
>>>>>> >>>> it is
>>>>>> >>>> accessible on the worker nodes.  This is demonstrated by the
>>>>>> fact that
>>>>>> >>>> most
>>>>>> >>>> of the time the custom kryo registrator is successfully run.
>>>>>> >>>>
>>>>>> >>>> What’s happening is that Kryo serialisation/deserialisation is
>>>>>> happening
>>>>>> >>>> most of the time on an “Executor task launch worker” thread,
>>>>>> which has
>>>>>> >>>> the
>>>>>> >>>> thread's class loader set to contain the application jar.  This
>>>>>> happens
>>>>>> >>>> in
>>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from
>>>>>> what I can
>>>>>> >>>> tell, it is only these threads that have access to the
>>>>>> application jar
>>>>>> >>>> (that contains the custom Kryo registrator).  However, the
>>>>>> >>>> ConnectionManager threads sometimes need to serialise/deserialise
>>>>>> >>>> objects
>>>>>> >>>> to satisfy “getBlock” requests when the objects haven’t
>>>>>> previously been
>>>>>> >>>> serialised.  As the ConnectionManager threads don’t have the
>>>>>> application
>>>>>> >>>> jar available from their class loader, when it tries to look up
>>>>>> the
>>>>>> >>>> custom
>>>>>> >>>> Kryo registrator, this fails.  Spark then swallows this
>>>>>> exception, which
>>>>>> >>>> results in a different ID number —> class mapping for this kryo
>>>>>> >>>> instance,
>>>>>> >>>> and this then causes deserialisation errors later on a different
>>>>>> node.
>>>>>> >>>>
>>>>>> >>>> A related issue to the issue reported in SPARK-2878 is that Spark
>>>>>> >>>> probably
>>>>>> >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo
>>>>>> >>>> registrators.
>>>>>> >>>>  The user has explicitly specified this class, and if it
>>>>>> >>>> deterministically
>>>>>> >>>> can’t be found, then it may cause problems at serialisation /
>>>>>> >>>> deserialisation time.  If only sometimes it can’t be found (as
>>>>>> in this
>>>>>> >>>> case), then it leads to a data corruption issue later on.
>>>>>>  Either way,
>>>>>> >>>> we’re better off dying due to the ClassNotFound exception
>>>>>> earlier, than
>>>>>> >>>> the
>>>>>> >>>> weirder errors later on.
>>>>>> >>>>
>>>>>> >>>> I have some ideas on potential solutions to this issue, but I’m
>>>>>> keen for
>>>>>> >>>> experienced eyes to critique these approaches:
>>>>>> >>>>
>>>>>> >>>> 1. The simplest approach to fixing this would be to just make the
>>>>>> >>>> application jar available to the connection manager threads, but
>>>>>> I’m
>>>>>> >>>> guessing it’s a design decision to isolate the application jar
>>>>>> to just
>>>>>> >>>> the
>>>>>> >>>> executor task runner threads.  Also, I don’t know if there are
>>>>>> any other
>>>>>> >>>> threads that might be interacting with kryo serialisation /
>>>>>> >>>> deserialisation.
>>>>>> >>>> 2. Before looking up the custom Kryo registrator, change the
>>>>>> thread’s
>>>>>> >>>> class
>>>>>> >>>> loader to include the application jar, then restore the class
>>>>>> loader
>>>>>> >>>> after
>>>>>> >>>> the kryo registrator has been run.  I don’t know if this would
>>>>>> have any
>>>>>> >>>> other side-effects.
>>>>>> >>>> 3. Always serialise / deserialise on the existing TaskRunner
>>>>>> threads,
>>>>>> >>>> rather than delaying serialisation until later, when it can be
>>>>>> done
>>>>>> >>>> only if
>>>>>> >>>> needed.  This approach would probably have negative performance
>>>>>> >>>> consequences.
>>>>>> >>>> 4. Create a new dedicated thread pool for lazy serialisation /
>>>>>> >>>> deserialisation that has the application jar on the class path.
>>>>>> >>>>  Serialisation / deserialisation would be the only thing these
>>>>>> threads
>>>>>> >>>> do,
>>>>>> >>>> and this would minimise conflicts / interactions between the
>>>>>> application
>>>>>> >>>> jar and other jars.
>>>>>> >>>>
>>>>>> >>>> #4 sounds like the best approach to me, but I think would require
>>>>>> >>>> considerable knowledge of Spark internals, which is beyond me at
>>>>>> >>>> present.
>>>>>> >>>>  Does anyone have any better (and ideally simpler) ideas?
>>>>>> >>>>
>>>>>> >>>> Cheers,
>>>>>> >>>>
>>>>>> >>>> Graham
>>>>>> >>>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to