Is there a fix that I can test ? I have the flows setup for both standalone
and YARN runs...

Thanks.
Deb



On Thu, Aug 14, 2014 at 10:59 AM, Reynold Xin <r...@databricks.com> wrote:

> Yes, I understand it might not work for custom serializer, but that is a
> much less common path.
>
> Basically I want a quick fix for 1.1 release (which is coming up soon). I
> would not be comfortable making big changes to class path late into the
> release cycle. We can do that for 1.2.
>
>
>
>
>
> On Thu, Aug 14, 2014 at 2:35 AM, Graham Dennis <graham.den...@gmail.com>
> wrote:
>
>> That should work, but would you also make these changes to the
>> JavaSerializer?  The API of these is the same so that you can select one or
>> the other (or in theory a custom serializer)?  This also wouldn't address
>> the problem of shipping custom *serializers* (not kryo registrators) in
>> user jars.
>>
>> On 14 August 2014 19:23, Reynold Xin <r...@databricks.com> wrote:
>>
>>> Graham,
>>>
>>> SparkEnv only creates a KryoSerializer, but as I understand that
>>> serializer doesn't actually initializes the registrator since that is only
>>> called when newKryo() is called when KryoSerializerInstance is initialized.
>>>
>>> Basically I'm thinking a quick fix for 1.2:
>>>
>>> 1. Add a classLoader field to KryoSerializer; initialize new
>>> KryoSerializerInstance with that class loader
>>>
>>>  2. Set that classLoader to the executor's class loader when Executor is
>>> initialized.
>>>
>>> Then all deser calls should be using the executor's class loader.
>>>
>>>
>>>
>>>
>>> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <graham.den...@gmail.com
>>> > wrote:
>>>
>>>> Hi Reynold,
>>>>
>>>> That would solve this specific issue, but you'd need to be careful that
>>>> you never created a serialiser instance before the first task is received.
>>>>  Currently in Executor.TaskRunner.run a closure serialiser instance is
>>>> created before any application jars are downloaded, but that could be
>>>> moved.  To me, this seems a little fragile.
>>>>
>>>> However there is a related issue where you can't ship a custom
>>>> serialiser in an application jar because the serialiser is instantiated
>>>> when the SparkEnv object is created, which is before any tasks are received
>>>> by the executor.  The above approach wouldn't help with this problem.
>>>>  Additionally, the YARN scheduler currently uses this approach of adding
>>>> the application jar to the Executor classpath, so it would make things a
>>>> bit more uniform.
>>>>
>>>> Cheers,
>>>> Graham
>>>>
>>>>
>>>> On 14 August 2014 17:37, Reynold Xin <r...@databricks.com> wrote:
>>>>
>>>>> Graham,
>>>>>
>>>>> Thanks for working on this. This is an important bug to fix.
>>>>>
>>>>>  I don't have the whole context and obviously I haven't spent nearly
>>>>> as much time on this as you have, but I'm wondering what if we always pass
>>>>> the executor's ClassLoader to the Kryo serializer? Will that solve this
>>>>> problem?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis <
>>>>> graham.den...@gmail.com> wrote:
>>>>>
>>>>>> Hi Deb,
>>>>>>
>>>>>> The only alternative serialiser is the JavaSerialiser (the default).
>>>>>>  Theoretically Spark supports custom serialisers, but due to a related
>>>>>> issue, custom serialisers currently can't live in application jars and 
>>>>>> must
>>>>>> be available to all executors at launch.  My PR fixes this issue as well,
>>>>>> allowing custom serialisers to be shipped in application jars.
>>>>>>
>>>>>> Graham
>>>>>>
>>>>>>
>>>>>> On 14 August 2014 16:56, Debasish Das <debasish.da...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Sorry I just saw Graham's email after sending my previous email
>>>>>>> about this bug...
>>>>>>>
>>>>>>> I have been seeing this same issue on our ALS runs last week but I
>>>>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0...
>>>>>>>
>>>>>>> What's the status of this PR ? Will this fix be back-ported to 1.0.1
>>>>>>> as we are running 1.0.1 stable standalone cluster ?
>>>>>>>
>>>>>>> Till the PR merges does it make sense to not use Kryo ? What are the
>>>>>>> other recommended efficient serializers ?
>>>>>>>
>>>>>>> Thanks.
>>>>>>> Deb
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <
>>>>>>> graham.den...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I now have a complete pull request for this issue that I'd like to
>>>>>>>> get
>>>>>>>> reviewed and committed.  The PR is available here:
>>>>>>>> https://github.com/apache/spark/pull/1890 and includes a testcase
>>>>>>>> for the
>>>>>>>> issue I described.  I've also submitted a related PR (
>>>>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions
>>>>>>>> raised
>>>>>>>> while attempting to run the custom kryo registrator not to be
>>>>>>>> swallowed.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Graham
>>>>>>>>
>>>>>>>>
>>>>>>>> On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> > I've submitted a work-in-progress pull request for this issue
>>>>>>>> that I'd
>>>>>>>> > like feedback on.  See https://github.com/apache/spark/pull/1890
>>>>>>>> . I've
>>>>>>>> > also submitted a pull request for the related issue that the
>>>>>>>> exceptions hit
>>>>>>>> > when trying to use a custom kryo registrator are being swallowed:
>>>>>>>> > https://github.com/apache/spark/pull/1827
>>>>>>>> >
>>>>>>>> > The approach in my pull request is to get the Worker processes to
>>>>>>>> download
>>>>>>>> > the application jars and add them to the Executor class path at
>>>>>>>> launch
>>>>>>>> > time. There are a couple of things that still need to be done
>>>>>>>> before this
>>>>>>>> > can be merged:
>>>>>>>> > 1. At the moment, the first time a task runs in the executor, the
>>>>>>>> > application jars are downloaded again.  My solution here would be
>>>>>>>> to make
>>>>>>>> > the executor not download any jars that already exist.
>>>>>>>>  Previously, the
>>>>>>>> > driver & executor kept track of the timestamp of jar files and
>>>>>>>> would
>>>>>>>> > redownload 'updated' jars, however this never made sense as the
>>>>>>>> previous
>>>>>>>> > version of the updated jar may have already been loaded into the
>>>>>>>> executor,
>>>>>>>> > so the updated jar may have no effect.  As my current pull
>>>>>>>> request removes
>>>>>>>> > the timestamp for jars, just checking whether the jar exists will
>>>>>>>> allow us
>>>>>>>> > to avoid downloading the jars again.
>>>>>>>> > 2. Tests. :-)
>>>>>>>> >
>>>>>>>> > A side-benefit of my pull request is that you will be able to use
>>>>>>>> custom
>>>>>>>> > serialisers that are distributed in a user jar.  Currently, the
>>>>>>>> serialiser
>>>>>>>> > instance is created in the Executor process before the first task
>>>>>>>> is
>>>>>>>> > received and therefore before any user jars are downloaded.  As
>>>>>>>> this PR
>>>>>>>> > adds user jars to the Executor process at launch time, this won't
>>>>>>>> be an
>>>>>>>> > issue.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> >> See my comment on
>>>>>>>> https://issues.apache.org/jira/browse/SPARK-2878 for
>>>>>>>> >> the full stacktrace, but it's in the
>>>>>>>> BlockManager/BlockManagerWorker where
>>>>>>>> >> it's trying to fulfil a "getBlock" request for another node.
>>>>>>>>  The objects
>>>>>>>> >> that would be in the block haven't yet been serialised, and that
>>>>>>>> then
>>>>>>>> >> causes the deserialisation to happen on that thread.  See
>>>>>>>> >> MemoryStore.scala:102.
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote:
>>>>>>>> >>
>>>>>>>> >>> I don't think it was a conscious design decision to not include
>>>>>>>> the
>>>>>>>> >>> application classes in the connection manager serializer. We
>>>>>>>> should fix
>>>>>>>> >>> that. Where is it deserializing data in that thread?
>>>>>>>> >>>
>>>>>>>> >>>  4 might make sense in the long run, but it adds a lot of
>>>>>>>> complexity to
>>>>>>>> >>> the code base (whole separate code base, task queue,
>>>>>>>> blocking/non-blocking
>>>>>>>> >>> logic within task threads) that can be error prone, so I think
>>>>>>>> it is best
>>>>>>>> >>> to stay away from that right now.
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <
>>>>>>>> graham.den...@gmail.com>
>>>>>>>> >>> wrote:
>>>>>>>> >>>
>>>>>>>> >>>> Hi Spark devs,
>>>>>>>> >>>>
>>>>>>>> >>>> I’ve posted an issue on JIRA (
>>>>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878) which
>>>>>>>> occurs when
>>>>>>>> >>>> using
>>>>>>>> >>>> Kryo serialisation with a custom Kryo registrator to register
>>>>>>>> custom
>>>>>>>> >>>> classes with Kryo.  This is an insidious issue that
>>>>>>>> >>>> non-deterministically
>>>>>>>> >>>> causes Kryo to have different ID number => class name maps on
>>>>>>>> different
>>>>>>>> >>>> nodes, which then causes weird exceptions (ClassCastException,
>>>>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at
>>>>>>>> >>>> deserialisation
>>>>>>>> >>>> time.  I’ve created a reliable reproduction for the issue here:
>>>>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>>>>>> >>>>
>>>>>>>> >>>> I’m happy to try and put a pull request together to try and
>>>>>>>> address
>>>>>>>> >>>> this,
>>>>>>>> >>>> but it’s not obvious to me the right way to solve this and I’d
>>>>>>>> like to
>>>>>>>> >>>> get
>>>>>>>> >>>> feedback / ideas on how to address this.
>>>>>>>> >>>>
>>>>>>>> >>>> The root cause of the problem is a "Failed to run
>>>>>>>> >>>> spark.kryo.registrator”
>>>>>>>> >>>> error which non-deterministically occurs in some executor
>>>>>>>> processes
>>>>>>>> >>>> during
>>>>>>>> >>>> operation.  My custom Kryo registrator is in the application
>>>>>>>> jar, and
>>>>>>>> >>>> it is
>>>>>>>> >>>> accessible on the worker nodes.  This is demonstrated by the
>>>>>>>> fact that
>>>>>>>> >>>> most
>>>>>>>> >>>> of the time the custom kryo registrator is successfully run.
>>>>>>>> >>>>
>>>>>>>> >>>> What’s happening is that Kryo serialisation/deserialisation is
>>>>>>>> happening
>>>>>>>> >>>> most of the time on an “Executor task launch worker” thread,
>>>>>>>> which has
>>>>>>>> >>>> the
>>>>>>>> >>>> thread's class loader set to contain the application jar.
>>>>>>>>  This happens
>>>>>>>> >>>> in
>>>>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from
>>>>>>>> what I can
>>>>>>>> >>>> tell, it is only these threads that have access to the
>>>>>>>> application jar
>>>>>>>> >>>> (that contains the custom Kryo registrator).  However, the
>>>>>>>> >>>> ConnectionManager threads sometimes need to
>>>>>>>> serialise/deserialise
>>>>>>>> >>>> objects
>>>>>>>> >>>> to satisfy “getBlock” requests when the objects haven’t
>>>>>>>> previously been
>>>>>>>> >>>> serialised.  As the ConnectionManager threads don’t have the
>>>>>>>> application
>>>>>>>> >>>> jar available from their class loader, when it tries to look
>>>>>>>> up the
>>>>>>>> >>>> custom
>>>>>>>> >>>> Kryo registrator, this fails.  Spark then swallows this
>>>>>>>> exception, which
>>>>>>>> >>>> results in a different ID number —> class mapping for this kryo
>>>>>>>> >>>> instance,
>>>>>>>> >>>> and this then causes deserialisation errors later on a
>>>>>>>> different node.
>>>>>>>> >>>>
>>>>>>>> >>>> A related issue to the issue reported in SPARK-2878 is that
>>>>>>>> Spark
>>>>>>>> >>>> probably
>>>>>>>> >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo
>>>>>>>> >>>> registrators.
>>>>>>>> >>>>  The user has explicitly specified this class, and if it
>>>>>>>> >>>> deterministically
>>>>>>>> >>>> can’t be found, then it may cause problems at serialisation /
>>>>>>>> >>>> deserialisation time.  If only sometimes it can’t be found (as
>>>>>>>> in this
>>>>>>>> >>>> case), then it leads to a data corruption issue later on.
>>>>>>>>  Either way,
>>>>>>>> >>>> we’re better off dying due to the ClassNotFound exception
>>>>>>>> earlier, than
>>>>>>>> >>>> the
>>>>>>>> >>>> weirder errors later on.
>>>>>>>> >>>>
>>>>>>>> >>>> I have some ideas on potential solutions to this issue, but
>>>>>>>> I’m keen for
>>>>>>>> >>>> experienced eyes to critique these approaches:
>>>>>>>> >>>>
>>>>>>>> >>>> 1. The simplest approach to fixing this would be to just make
>>>>>>>> the
>>>>>>>> >>>> application jar available to the connection manager threads,
>>>>>>>> but I’m
>>>>>>>> >>>> guessing it’s a design decision to isolate the application jar
>>>>>>>> to just
>>>>>>>> >>>> the
>>>>>>>> >>>> executor task runner threads.  Also, I don’t know if there are
>>>>>>>> any other
>>>>>>>> >>>> threads that might be interacting with kryo serialisation /
>>>>>>>> >>>> deserialisation.
>>>>>>>> >>>> 2. Before looking up the custom Kryo registrator, change the
>>>>>>>> thread’s
>>>>>>>> >>>> class
>>>>>>>> >>>> loader to include the application jar, then restore the class
>>>>>>>> loader
>>>>>>>> >>>> after
>>>>>>>> >>>> the kryo registrator has been run.  I don’t know if this would
>>>>>>>> have any
>>>>>>>> >>>> other side-effects.
>>>>>>>> >>>> 3. Always serialise / deserialise on the existing TaskRunner
>>>>>>>> threads,
>>>>>>>> >>>> rather than delaying serialisation until later, when it can be
>>>>>>>> done
>>>>>>>> >>>> only if
>>>>>>>> >>>> needed.  This approach would probably have negative performance
>>>>>>>> >>>> consequences.
>>>>>>>> >>>> 4. Create a new dedicated thread pool for lazy serialisation /
>>>>>>>> >>>> deserialisation that has the application jar on the class path.
>>>>>>>> >>>>  Serialisation / deserialisation would be the only thing these
>>>>>>>> threads
>>>>>>>> >>>> do,
>>>>>>>> >>>> and this would minimise conflicts / interactions between the
>>>>>>>> application
>>>>>>>> >>>> jar and other jars.
>>>>>>>> >>>>
>>>>>>>> >>>> #4 sounds like the best approach to me, but I think would
>>>>>>>> require
>>>>>>>> >>>> considerable knowledge of Spark internals, which is beyond me
>>>>>>>> at
>>>>>>>> >>>> present.
>>>>>>>> >>>>  Does anyone have any better (and ideally simpler) ideas?
>>>>>>>> >>>>
>>>>>>>> >>>> Cheers,
>>>>>>>> >>>>
>>>>>>>> >>>> Graham
>>>>>>>> >>>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to