By the way I have seen this same problem while deploying 1.1.0-SNAPSHOT on
YARN as well...

So it is a common problem in both standalone and YARN mode deployment...


On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <graham.den...@gmail.com>
wrote:

> Hi Reynold,
>
> That would solve this specific issue, but you'd need to be careful that
> you never created a serialiser instance before the first task is received.
>  Currently in Executor.TaskRunner.run a closure serialiser instance is
> created before any application jars are downloaded, but that could be
> moved.  To me, this seems a little fragile.
>
> However there is a related issue where you can't ship a custom serialiser
> in an application jar because the serialiser is instantiated when the
> SparkEnv object is created, which is before any tasks are received by the
> executor.  The above approach wouldn't help with this problem.
>  Additionally, the YARN scheduler currently uses this approach of adding
> the application jar to the Executor classpath, so it would make things a
> bit more uniform.
>
> Cheers,
> Graham
>
>
> On 14 August 2014 17:37, Reynold Xin <r...@databricks.com> wrote:
>
>> Graham,
>>
>> Thanks for working on this. This is an important bug to fix.
>>
>> I don't have the whole context and obviously I haven't spent nearly as
>> much time on this as you have, but I'm wondering what if we always pass the
>> executor's ClassLoader to the Kryo serializer? Will that solve this problem?
>>
>>
>>
>>
>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis <graham.den...@gmail.com>
>> wrote:
>>
>>> Hi Deb,
>>>
>>> The only alternative serialiser is the JavaSerialiser (the default).
>>>  Theoretically Spark supports custom serialisers, but due to a related
>>> issue, custom serialisers currently can't live in application jars and must
>>> be available to all executors at launch.  My PR fixes this issue as well,
>>> allowing custom serialisers to be shipped in application jars.
>>>
>>> Graham
>>>
>>>
>>> On 14 August 2014 16:56, Debasish Das <debasish.da...@gmail.com> wrote:
>>>
>>>> Sorry I just saw Graham's email after sending my previous email about
>>>> this bug...
>>>>
>>>> I have been seeing this same issue on our ALS runs last week but I
>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0...
>>>>
>>>> What's the status of this PR ? Will this fix be back-ported to 1.0.1 as
>>>> we are running 1.0.1 stable standalone cluster ?
>>>>
>>>> Till the PR merges does it make sense to not use Kryo ? What are the
>>>> other recommended efficient serializers ?
>>>>
>>>> Thanks.
>>>> Deb
>>>>
>>>>
>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <graham.den...@gmail.com
>>>> > wrote:
>>>>
>>>>> I now have a complete pull request for this issue that I'd like to get
>>>>> reviewed and committed.  The PR is available here:
>>>>> https://github.com/apache/spark/pull/1890 and includes a testcase for
>>>>> the
>>>>> issue I described.  I've also submitted a related PR (
>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions
>>>>> raised
>>>>> while attempting to run the custom kryo registrator not to be
>>>>> swallowed.
>>>>>
>>>>> Thanks,
>>>>> Graham
>>>>>
>>>>>
>>>>> On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > I've submitted a work-in-progress pull request for this issue that
>>>>> I'd
>>>>> > like feedback on.  See https://github.com/apache/spark/pull/1890 .
>>>>> I've
>>>>> > also submitted a pull request for the related issue that the
>>>>> exceptions hit
>>>>> > when trying to use a custom kryo registrator are being swallowed:
>>>>> > https://github.com/apache/spark/pull/1827
>>>>> >
>>>>> > The approach in my pull request is to get the Worker processes to
>>>>> download
>>>>> > the application jars and add them to the Executor class path at
>>>>> launch
>>>>> > time. There are a couple of things that still need to be done before
>>>>> this
>>>>> > can be merged:
>>>>> > 1. At the moment, the first time a task runs in the executor, the
>>>>> > application jars are downloaded again.  My solution here would be to
>>>>> make
>>>>> > the executor not download any jars that already exist.  Previously,
>>>>> the
>>>>> > driver & executor kept track of the timestamp of jar files and would
>>>>> > redownload 'updated' jars, however this never made sense as the
>>>>> previous
>>>>> > version of the updated jar may have already been loaded into the
>>>>> executor,
>>>>> > so the updated jar may have no effect.  As my current pull request
>>>>> removes
>>>>> > the timestamp for jars, just checking whether the jar exists will
>>>>> allow us
>>>>> > to avoid downloading the jars again.
>>>>> > 2. Tests. :-)
>>>>> >
>>>>> > A side-benefit of my pull request is that you will be able to use
>>>>> custom
>>>>> > serialisers that are distributed in a user jar.  Currently, the
>>>>> serialiser
>>>>> > instance is created in the Executor process before the first task is
>>>>> > received and therefore before any user jars are downloaded.  As this
>>>>> PR
>>>>> > adds user jars to the Executor process at launch time, this won't be
>>>>> an
>>>>> > issue.
>>>>> >
>>>>> >
>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> >> See my comment on https://issues.apache.org/jira/browse/SPARK-2878
>>>>> for
>>>>> >> the full stacktrace, but it's in the
>>>>> BlockManager/BlockManagerWorker where
>>>>> >> it's trying to fulfil a "getBlock" request for another node.  The
>>>>> objects
>>>>> >> that would be in the block haven't yet been serialised, and that
>>>>> then
>>>>> >> causes the deserialisation to happen on that thread.  See
>>>>> >> MemoryStore.scala:102.
>>>>> >>
>>>>> >>
>>>>> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote:
>>>>> >>
>>>>> >>> I don't think it was a conscious design decision to not include the
>>>>> >>> application classes in the connection manager serializer. We
>>>>> should fix
>>>>> >>> that. Where is it deserializing data in that thread?
>>>>> >>>
>>>>> >>>  4 might make sense in the long run, but it adds a lot of
>>>>> complexity to
>>>>> >>> the code base (whole separate code base, task queue,
>>>>> blocking/non-blocking
>>>>> >>> logic within task threads) that can be error prone, so I think it
>>>>> is best
>>>>> >>> to stay away from that right now.
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <
>>>>> graham.den...@gmail.com>
>>>>> >>> wrote:
>>>>> >>>
>>>>> >>>> Hi Spark devs,
>>>>> >>>>
>>>>> >>>> I’ve posted an issue on JIRA (
>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs
>>>>> when
>>>>> >>>> using
>>>>> >>>> Kryo serialisation with a custom Kryo registrator to register
>>>>> custom
>>>>> >>>> classes with Kryo.  This is an insidious issue that
>>>>> >>>> non-deterministically
>>>>> >>>> causes Kryo to have different ID number => class name maps on
>>>>> different
>>>>> >>>> nodes, which then causes weird exceptions (ClassCastException,
>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at
>>>>> >>>> deserialisation
>>>>> >>>> time.  I’ve created a reliable reproduction for the issue here:
>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>>> >>>>
>>>>> >>>> I’m happy to try and put a pull request together to try and
>>>>> address
>>>>> >>>> this,
>>>>> >>>> but it’s not obvious to me the right way to solve this and I’d
>>>>> like to
>>>>> >>>> get
>>>>> >>>> feedback / ideas on how to address this.
>>>>> >>>>
>>>>> >>>> The root cause of the problem is a "Failed to run
>>>>> >>>> spark.kryo.registrator”
>>>>> >>>> error which non-deterministically occurs in some executor
>>>>> processes
>>>>> >>>> during
>>>>> >>>> operation.  My custom Kryo registrator is in the application jar,
>>>>> and
>>>>> >>>> it is
>>>>> >>>> accessible on the worker nodes.  This is demonstrated by the fact
>>>>> that
>>>>> >>>> most
>>>>> >>>> of the time the custom kryo registrator is successfully run.
>>>>> >>>>
>>>>> >>>> What’s happening is that Kryo serialisation/deserialisation is
>>>>> happening
>>>>> >>>> most of the time on an “Executor task launch worker” thread,
>>>>> which has
>>>>> >>>> the
>>>>> >>>> thread's class loader set to contain the application jar.  This
>>>>> happens
>>>>> >>>> in
>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from
>>>>> what I can
>>>>> >>>> tell, it is only these threads that have access to the
>>>>> application jar
>>>>> >>>> (that contains the custom Kryo registrator).  However, the
>>>>> >>>> ConnectionManager threads sometimes need to serialise/deserialise
>>>>> >>>> objects
>>>>> >>>> to satisfy “getBlock” requests when the objects haven’t
>>>>> previously been
>>>>> >>>> serialised.  As the ConnectionManager threads don’t have the
>>>>> application
>>>>> >>>> jar available from their class loader, when it tries to look up
>>>>> the
>>>>> >>>> custom
>>>>> >>>> Kryo registrator, this fails.  Spark then swallows this
>>>>> exception, which
>>>>> >>>> results in a different ID number —> class mapping for this kryo
>>>>> >>>> instance,
>>>>> >>>> and this then causes deserialisation errors later on a different
>>>>> node.
>>>>> >>>>
>>>>> >>>> A related issue to the issue reported in SPARK-2878 is that Spark
>>>>> >>>> probably
>>>>> >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo
>>>>> >>>> registrators.
>>>>> >>>>  The user has explicitly specified this class, and if it
>>>>> >>>> deterministically
>>>>> >>>> can’t be found, then it may cause problems at serialisation /
>>>>> >>>> deserialisation time.  If only sometimes it can’t be found (as in
>>>>> this
>>>>> >>>> case), then it leads to a data corruption issue later on.  Either
>>>>> way,
>>>>> >>>> we’re better off dying due to the ClassNotFound exception
>>>>> earlier, than
>>>>> >>>> the
>>>>> >>>> weirder errors later on.
>>>>> >>>>
>>>>> >>>> I have some ideas on potential solutions to this issue, but I’m
>>>>> keen for
>>>>> >>>> experienced eyes to critique these approaches:
>>>>> >>>>
>>>>> >>>> 1. The simplest approach to fixing this would be to just make the
>>>>> >>>> application jar available to the connection manager threads, but
>>>>> I’m
>>>>> >>>> guessing it’s a design decision to isolate the application jar to
>>>>> just
>>>>> >>>> the
>>>>> >>>> executor task runner threads.  Also, I don’t know if there are
>>>>> any other
>>>>> >>>> threads that might be interacting with kryo serialisation /
>>>>> >>>> deserialisation.
>>>>> >>>> 2. Before looking up the custom Kryo registrator, change the
>>>>> thread’s
>>>>> >>>> class
>>>>> >>>> loader to include the application jar, then restore the class
>>>>> loader
>>>>> >>>> after
>>>>> >>>> the kryo registrator has been run.  I don’t know if this would
>>>>> have any
>>>>> >>>> other side-effects.
>>>>> >>>> 3. Always serialise / deserialise on the existing TaskRunner
>>>>> threads,
>>>>> >>>> rather than delaying serialisation until later, when it can be
>>>>> done
>>>>> >>>> only if
>>>>> >>>> needed.  This approach would probably have negative performance
>>>>> >>>> consequences.
>>>>> >>>> 4. Create a new dedicated thread pool for lazy serialisation /
>>>>> >>>> deserialisation that has the application jar on the class path.
>>>>> >>>>  Serialisation / deserialisation would be the only thing these
>>>>> threads
>>>>> >>>> do,
>>>>> >>>> and this would minimise conflicts / interactions between the
>>>>> application
>>>>> >>>> jar and other jars.
>>>>> >>>>
>>>>> >>>> #4 sounds like the best approach to me, but I think would require
>>>>> >>>> considerable knowledge of Spark internals, which is beyond me at
>>>>> >>>> present.
>>>>> >>>>  Does anyone have any better (and ideally simpler) ideas?
>>>>> >>>>
>>>>> >>>> Cheers,
>>>>> >>>>
>>>>> >>>> Graham
>>>>> >>>>
>>>>> >>>
>>>>> >>>
>>>>> >>
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to