Yes, I understand it might not work for custom serializer, but that is a
much less common path.

Basically I want a quick fix for 1.1 release (which is coming up soon). I
would not be comfortable making big changes to class path late into the
release cycle. We can do that for 1.2.





On Thu, Aug 14, 2014 at 2:35 AM, Graham Dennis <graham.den...@gmail.com>
wrote:

> That should work, but would you also make these changes to the
> JavaSerializer?  The API of these is the same so that you can select one or
> the other (or in theory a custom serializer)?  This also wouldn't address
> the problem of shipping custom *serializers* (not kryo registrators) in
> user jars.
>
> On 14 August 2014 19:23, Reynold Xin <r...@databricks.com> wrote:
>
>> Graham,
>>
>> SparkEnv only creates a KryoSerializer, but as I understand that
>> serializer doesn't actually initializes the registrator since that is only
>> called when newKryo() is called when KryoSerializerInstance is initialized.
>>
>> Basically I'm thinking a quick fix for 1.2:
>>
>> 1. Add a classLoader field to KryoSerializer; initialize new
>> KryoSerializerInstance with that class loader
>>
>>  2. Set that classLoader to the executor's class loader when Executor is
>> initialized.
>>
>> Then all deser calls should be using the executor's class loader.
>>
>>
>>
>>
>> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <graham.den...@gmail.com>
>> wrote:
>>
>>> Hi Reynold,
>>>
>>> That would solve this specific issue, but you'd need to be careful that
>>> you never created a serialiser instance before the first task is received.
>>>  Currently in Executor.TaskRunner.run a closure serialiser instance is
>>> created before any application jars are downloaded, but that could be
>>> moved.  To me, this seems a little fragile.
>>>
>>> However there is a related issue where you can't ship a custom
>>> serialiser in an application jar because the serialiser is instantiated
>>> when the SparkEnv object is created, which is before any tasks are received
>>> by the executor.  The above approach wouldn't help with this problem.
>>>  Additionally, the YARN scheduler currently uses this approach of adding
>>> the application jar to the Executor classpath, so it would make things a
>>> bit more uniform.
>>>
>>> Cheers,
>>> Graham
>>>
>>>
>>> On 14 August 2014 17:37, Reynold Xin <r...@databricks.com> wrote:
>>>
>>>> Graham,
>>>>
>>>> Thanks for working on this. This is an important bug to fix.
>>>>
>>>>  I don't have the whole context and obviously I haven't spent nearly
>>>> as much time on this as you have, but I'm wondering what if we always pass
>>>> the executor's ClassLoader to the Kryo serializer? Will that solve this
>>>> problem?
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis <
>>>> graham.den...@gmail.com> wrote:
>>>>
>>>>> Hi Deb,
>>>>>
>>>>> The only alternative serialiser is the JavaSerialiser (the default).
>>>>>  Theoretically Spark supports custom serialisers, but due to a related
>>>>> issue, custom serialisers currently can't live in application jars and 
>>>>> must
>>>>> be available to all executors at launch.  My PR fixes this issue as well,
>>>>> allowing custom serialisers to be shipped in application jars.
>>>>>
>>>>> Graham
>>>>>
>>>>>
>>>>> On 14 August 2014 16:56, Debasish Das <debasish.da...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sorry I just saw Graham's email after sending my previous email about
>>>>>> this bug...
>>>>>>
>>>>>> I have been seeing this same issue on our ALS runs last week but I
>>>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0...
>>>>>>
>>>>>> What's the status of this PR ? Will this fix be back-ported to 1.0.1
>>>>>> as we are running 1.0.1 stable standalone cluster ?
>>>>>>
>>>>>> Till the PR merges does it make sense to not use Kryo ? What are the
>>>>>> other recommended efficient serializers ?
>>>>>>
>>>>>> Thanks.
>>>>>> Deb
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <
>>>>>> graham.den...@gmail.com> wrote:
>>>>>>
>>>>>>> I now have a complete pull request for this issue that I'd like to
>>>>>>> get
>>>>>>> reviewed and committed.  The PR is available here:
>>>>>>> https://github.com/apache/spark/pull/1890 and includes a testcase
>>>>>>> for the
>>>>>>> issue I described.  I've also submitted a related PR (
>>>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions
>>>>>>> raised
>>>>>>> while attempting to run the custom kryo registrator not to be
>>>>>>> swallowed.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Graham
>>>>>>>
>>>>>>>
>>>>>>> On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> > I've submitted a work-in-progress pull request for this issue that
>>>>>>> I'd
>>>>>>> > like feedback on.  See https://github.com/apache/spark/pull/1890
>>>>>>> . I've
>>>>>>> > also submitted a pull request for the related issue that the
>>>>>>> exceptions hit
>>>>>>> > when trying to use a custom kryo registrator are being swallowed:
>>>>>>> > https://github.com/apache/spark/pull/1827
>>>>>>> >
>>>>>>> > The approach in my pull request is to get the Worker processes to
>>>>>>> download
>>>>>>> > the application jars and add them to the Executor class path at
>>>>>>> launch
>>>>>>> > time. There are a couple of things that still need to be done
>>>>>>> before this
>>>>>>> > can be merged:
>>>>>>> > 1. At the moment, the first time a task runs in the executor, the
>>>>>>> > application jars are downloaded again.  My solution here would be
>>>>>>> to make
>>>>>>> > the executor not download any jars that already exist.
>>>>>>>  Previously, the
>>>>>>> > driver & executor kept track of the timestamp of jar files and
>>>>>>> would
>>>>>>> > redownload 'updated' jars, however this never made sense as the
>>>>>>> previous
>>>>>>> > version of the updated jar may have already been loaded into the
>>>>>>> executor,
>>>>>>> > so the updated jar may have no effect.  As my current pull request
>>>>>>> removes
>>>>>>> > the timestamp for jars, just checking whether the jar exists will
>>>>>>> allow us
>>>>>>> > to avoid downloading the jars again.
>>>>>>> > 2. Tests. :-)
>>>>>>> >
>>>>>>> > A side-benefit of my pull request is that you will be able to use
>>>>>>> custom
>>>>>>> > serialisers that are distributed in a user jar.  Currently, the
>>>>>>> serialiser
>>>>>>> > instance is created in the Executor process before the first task
>>>>>>> is
>>>>>>> > received and therefore before any user jars are downloaded.  As
>>>>>>> this PR
>>>>>>> > adds user jars to the Executor process at launch time, this won't
>>>>>>> be an
>>>>>>> > issue.
>>>>>>> >
>>>>>>> >
>>>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> >> See my comment on
>>>>>>> https://issues.apache.org/jira/browse/SPARK-2878 for
>>>>>>> >> the full stacktrace, but it's in the
>>>>>>> BlockManager/BlockManagerWorker where
>>>>>>> >> it's trying to fulfil a "getBlock" request for another node.  The
>>>>>>> objects
>>>>>>> >> that would be in the block haven't yet been serialised, and that
>>>>>>> then
>>>>>>> >> causes the deserialisation to happen on that thread.  See
>>>>>>> >> MemoryStore.scala:102.
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote:
>>>>>>> >>
>>>>>>> >>> I don't think it was a conscious design decision to not include
>>>>>>> the
>>>>>>> >>> application classes in the connection manager serializer. We
>>>>>>> should fix
>>>>>>> >>> that. Where is it deserializing data in that thread?
>>>>>>> >>>
>>>>>>> >>>  4 might make sense in the long run, but it adds a lot of
>>>>>>> complexity to
>>>>>>> >>> the code base (whole separate code base, task queue,
>>>>>>> blocking/non-blocking
>>>>>>> >>> logic within task threads) that can be error prone, so I think
>>>>>>> it is best
>>>>>>> >>> to stay away from that right now.
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <
>>>>>>> graham.den...@gmail.com>
>>>>>>> >>> wrote:
>>>>>>> >>>
>>>>>>> >>>> Hi Spark devs,
>>>>>>> >>>>
>>>>>>> >>>> I’ve posted an issue on JIRA (
>>>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs
>>>>>>> when
>>>>>>> >>>> using
>>>>>>> >>>> Kryo serialisation with a custom Kryo registrator to register
>>>>>>> custom
>>>>>>> >>>> classes with Kryo.  This is an insidious issue that
>>>>>>> >>>> non-deterministically
>>>>>>> >>>> causes Kryo to have different ID number => class name maps on
>>>>>>> different
>>>>>>> >>>> nodes, which then causes weird exceptions (ClassCastException,
>>>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at
>>>>>>> >>>> deserialisation
>>>>>>> >>>> time.  I’ve created a reliable reproduction for the issue here:
>>>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>>>>> >>>>
>>>>>>> >>>> I’m happy to try and put a pull request together to try and
>>>>>>> address
>>>>>>> >>>> this,
>>>>>>> >>>> but it’s not obvious to me the right way to solve this and I’d
>>>>>>> like to
>>>>>>> >>>> get
>>>>>>> >>>> feedback / ideas on how to address this.
>>>>>>> >>>>
>>>>>>> >>>> The root cause of the problem is a "Failed to run
>>>>>>> >>>> spark.kryo.registrator”
>>>>>>> >>>> error which non-deterministically occurs in some executor
>>>>>>> processes
>>>>>>> >>>> during
>>>>>>> >>>> operation.  My custom Kryo registrator is in the application
>>>>>>> jar, and
>>>>>>> >>>> it is
>>>>>>> >>>> accessible on the worker nodes.  This is demonstrated by the
>>>>>>> fact that
>>>>>>> >>>> most
>>>>>>> >>>> of the time the custom kryo registrator is successfully run.
>>>>>>> >>>>
>>>>>>> >>>> What’s happening is that Kryo serialisation/deserialisation is
>>>>>>> happening
>>>>>>> >>>> most of the time on an “Executor task launch worker” thread,
>>>>>>> which has
>>>>>>> >>>> the
>>>>>>> >>>> thread's class loader set to contain the application jar.  This
>>>>>>> happens
>>>>>>> >>>> in
>>>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from
>>>>>>> what I can
>>>>>>> >>>> tell, it is only these threads that have access to the
>>>>>>> application jar
>>>>>>> >>>> (that contains the custom Kryo registrator).  However, the
>>>>>>> >>>> ConnectionManager threads sometimes need to
>>>>>>> serialise/deserialise
>>>>>>> >>>> objects
>>>>>>> >>>> to satisfy “getBlock” requests when the objects haven’t
>>>>>>> previously been
>>>>>>> >>>> serialised.  As the ConnectionManager threads don’t have the
>>>>>>> application
>>>>>>> >>>> jar available from their class loader, when it tries to look up
>>>>>>> the
>>>>>>> >>>> custom
>>>>>>> >>>> Kryo registrator, this fails.  Spark then swallows this
>>>>>>> exception, which
>>>>>>> >>>> results in a different ID number —> class mapping for this kryo
>>>>>>> >>>> instance,
>>>>>>> >>>> and this then causes deserialisation errors later on a
>>>>>>> different node.
>>>>>>> >>>>
>>>>>>> >>>> A related issue to the issue reported in SPARK-2878 is that
>>>>>>> Spark
>>>>>>> >>>> probably
>>>>>>> >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo
>>>>>>> >>>> registrators.
>>>>>>> >>>>  The user has explicitly specified this class, and if it
>>>>>>> >>>> deterministically
>>>>>>> >>>> can’t be found, then it may cause problems at serialisation /
>>>>>>> >>>> deserialisation time.  If only sometimes it can’t be found (as
>>>>>>> in this
>>>>>>> >>>> case), then it leads to a data corruption issue later on.
>>>>>>>  Either way,
>>>>>>> >>>> we’re better off dying due to the ClassNotFound exception
>>>>>>> earlier, than
>>>>>>> >>>> the
>>>>>>> >>>> weirder errors later on.
>>>>>>> >>>>
>>>>>>> >>>> I have some ideas on potential solutions to this issue, but I’m
>>>>>>> keen for
>>>>>>> >>>> experienced eyes to critique these approaches:
>>>>>>> >>>>
>>>>>>> >>>> 1. The simplest approach to fixing this would be to just make
>>>>>>> the
>>>>>>> >>>> application jar available to the connection manager threads,
>>>>>>> but I’m
>>>>>>> >>>> guessing it’s a design decision to isolate the application jar
>>>>>>> to just
>>>>>>> >>>> the
>>>>>>> >>>> executor task runner threads.  Also, I don’t know if there are
>>>>>>> any other
>>>>>>> >>>> threads that might be interacting with kryo serialisation /
>>>>>>> >>>> deserialisation.
>>>>>>> >>>> 2. Before looking up the custom Kryo registrator, change the
>>>>>>> thread’s
>>>>>>> >>>> class
>>>>>>> >>>> loader to include the application jar, then restore the class
>>>>>>> loader
>>>>>>> >>>> after
>>>>>>> >>>> the kryo registrator has been run.  I don’t know if this would
>>>>>>> have any
>>>>>>> >>>> other side-effects.
>>>>>>> >>>> 3. Always serialise / deserialise on the existing TaskRunner
>>>>>>> threads,
>>>>>>> >>>> rather than delaying serialisation until later, when it can be
>>>>>>> done
>>>>>>> >>>> only if
>>>>>>> >>>> needed.  This approach would probably have negative performance
>>>>>>> >>>> consequences.
>>>>>>> >>>> 4. Create a new dedicated thread pool for lazy serialisation /
>>>>>>> >>>> deserialisation that has the application jar on the class path.
>>>>>>> >>>>  Serialisation / deserialisation would be the only thing these
>>>>>>> threads
>>>>>>> >>>> do,
>>>>>>> >>>> and this would minimise conflicts / interactions between the
>>>>>>> application
>>>>>>> >>>> jar and other jars.
>>>>>>> >>>>
>>>>>>> >>>> #4 sounds like the best approach to me, but I think would
>>>>>>> require
>>>>>>> >>>> considerable knowledge of Spark internals, which is beyond me at
>>>>>>> >>>> present.
>>>>>>> >>>>  Does anyone have any better (and ideally simpler) ideas?
>>>>>>> >>>>
>>>>>>> >>>> Cheers,
>>>>>>> >>>>
>>>>>>> >>>> Graham
>>>>>>> >>>>
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to