Ah, got it.  While that would be useful, it doesn't address the more
general (and potentially even more beneficial) case where the total number
of worker nodes is fully elastic.  That already starts to push you into the
direction of spitting Spark worker and HDFS data nodes into disjoint sets,
and to compensate for the loss of data locality you start wishing for some
kind of hierarchical storage where at least your hot data can be present on
the Spark workers.  Even without an elastic number of HDFS nodes, you might
well get into a similar kind of desire for hierarchical storage another
layer providing faster access to the shuffle files than is possible using
HDFS -- because I share Reynold's scepticism that HDFS by itself will be up
to demands of handling the shuffle files.  With such a hierarchical split
or Spark-node-local caching layer, considering the more general split
between data and fully elastic worker nodes becomes much more tractable.

On Thu, Apr 28, 2016 at 11:23 AM, Michael Gummelt <mgumm...@mesosphere.io>
wrote:

> Not disjoint.  Colocated.  By "shrinking", I don't mean any nodes are
> going away.  I mean executors are decreasing in number, which is the case
> with dynamic allocation.  HDFS nodes aren't decreasing in number though,
> and we can still colocate on those nodes, as always.
>
> On Thu, Apr 28, 2016 at 11:19 AM, Mark Hamstra <m...@clearstorydata.com>
> wrote:
>
>> So you are only considering the case where your set of HDFS nodes is
>> disjoint from your dynamic set of Spark Worker nodes?  That would seem to
>> be a pretty significant sacrifice of data locality.
>>
>> On Thu, Apr 28, 2016 at 11:15 AM, Michael Gummelt <mgumm...@mesosphere.io
>> > wrote:
>>
>>> > if after a work-load burst your cluster dynamically changes from 10000
>>> workers to 1000, will the typical HDFS replication factor be sufficient to
>>> retain access to the shuffle files in HDFS
>>>
>>> HDFS isn't resizing.  Spark is.  HDFS files should be HA and durable.
>>>
>>> On Thu, Apr 28, 2016 at 11:08 AM, Mark Hamstra <m...@clearstorydata.com>
>>> wrote:
>>>
>>>> Yes, replicated and distributed shuffle materializations are key
>>>> requirement to maintain performance in a fully elastic cluster where
>>>> Executors aren't just reallocated across an essentially fixed number of
>>>> Worker nodes, but rather the number of Workers itself is dynamic.
>>>> Retaining the file interface to those shuffle materializations while also
>>>> using HDFS for the spark.local.dirs has a certain amount of attraction, but
>>>> I also wonder whether a typical HDFS deployment is really sufficient to
>>>> handle this kind of elastic cluster scaling.  For instance and assuming
>>>> HDFS co-located on worker nodes, if after a work-load burst your cluster
>>>> dynamically changes from 10000 workers to 1000, will the typical HDFS
>>>> replication factor be sufficient to retain access to the shuffle files in
>>>> HDFS, or will we instead be seeing numerous FetchFailure exceptions, Tasks
>>>> recomputed or Stages aborted, etc. so that the net effect is not all that
>>>> much different than if the shuffle files had not been relocated to HDFS and
>>>> the Executors or ShuffleService instances had just disappeared along with
>>>> the worker nodes?
>>>>
>>>> On Thu, Apr 28, 2016 at 10:46 AM, Michael Gummelt <
>>>> mgumm...@mesosphere.io> wrote:
>>>>
>>>>> > Why would you run the shuffle service on 10K nodes but Spark
>>>>> executors
>>>>> on just 100 nodes? wouldn't you also run that service just on the 100
>>>>> nodes?
>>>>>
>>>>> We have to start the service beforehand, out of band, and we don't
>>>>> know a priori where the Spark executors will land.  Those 100 executors
>>>>> could land on any of the 10K nodes.
>>>>>
>>>>> > What does plumbing it through HDFS buy you in comparison?
>>>>>
>>>>> It drops the shuffle service requirement, which is HUGE.  It means
>>>>> Spark can completely vacate the machine when it's not in use, which is
>>>>> crucial for a large, multi-tenant cluster.  ShuffledRDDs can now read the
>>>>> map files from HDFS, rather than the ancestor executors, which means we 
>>>>> can
>>>>> shut executors down immediately after the shuffle files are written.
>>>>>
>>>>> > There's some additional overhead and if anything you lose some
>>>>> control over locality, in a context where I presume HDFS itself is storing
>>>>> data on much more than the 100 Spark nodes.
>>>>>
>>>>> Write locality would be sacrificed, but the descendent executors were
>>>>> already doing a remote read (they have to read from multiple ancestor
>>>>> executors), so there's no additional cost in read locality.  In fact, if 
>>>>> we
>>>>> take advantage of HDFS's favored node feature, we could make it likely 
>>>>> that
>>>>> all map files for a given partition land on the same node, so the
>>>>> descendent executor would never have to do a remote read!  We'd 
>>>>> effectively
>>>>> shift the remote IO from read side to write side, for theoretically no
>>>>> change in performance.
>>>>>
>>>>> In summary:
>>>>>
>>>>> Advantages:
>>>>> - No shuffle service dependency (increased utilization, decreased
>>>>> management cost)
>>>>> - Shut executors down immediately after shuffle files are written,
>>>>> rather than waiting for a timeout (increased utilization)
>>>>> - HDFS is HA, so shuffle files survive a node failure, which isn't
>>>>> true for the shuffle service (decreased latency during failures)
>>>>> - Potential ability to parallelize shuffle file reads if we write a
>>>>> new shuffle iterator (decreased latency)
>>>>>
>>>>> Disadvantages
>>>>> - Increased write latency (but potentially not if we implement it
>>>>> efficiently.  See above).
>>>>> - Would need some sort of GC on HDFS shuffle files
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Apr 28, 2016 at 1:36 AM, Sean Owen <so...@cloudera.com> wrote:
>>>>>
>>>>>> Why would you run the shuffle service on 10K nodes but Spark executors
>>>>>> on just 100 nodes? wouldn't you also run that service just on the 100
>>>>>> nodes?
>>>>>>
>>>>>> What does plumbing it through HDFS buy you in comparison? There's some
>>>>>> additional overhead and if anything you lose some control over
>>>>>> locality, in a context where I presume HDFS itself is storing data on
>>>>>> much more than the 100 Spark nodes.
>>>>>>
>>>>>> On Thu, Apr 28, 2016 at 1:34 AM, Michael Gummelt <
>>>>>> mgumm...@mesosphere.io> wrote:
>>>>>> >> Are you suggesting to have shuffle service persist and fetch data
>>>>>> with
>>>>>> >> hdfs, or skip shuffle service altogether and just write to hdfs?
>>>>>> >
>>>>>> > Skip shuffle service altogether.  Write to HDFS.
>>>>>> >
>>>>>> > Mesos environments tend to be multi-tenant, and running the shuffle
>>>>>> service
>>>>>> > on all nodes could be extremely wasteful.  If you're running a 10K
>>>>>> node
>>>>>> > cluster, and you'd like to run a Spark job that consumes 100 nodes,
>>>>>> you
>>>>>> > would have to run the shuffle service on all 10K nodes out of band
>>>>>> of Spark
>>>>>> > (e.g. marathon).  I'd like a solution for dynamic allocation that
>>>>>> doesn't
>>>>>> > require this overhead.
>>>>>> >
>>>>>> > I'll look at SPARK-1529.
>>>>>> >
>>>>>> > On Wed, Apr 27, 2016 at 10:24 AM, Steve Loughran <
>>>>>> ste...@hortonworks.com>
>>>>>> > wrote:
>>>>>> >>
>>>>>> >>
>>>>>> >> > On 27 Apr 2016, at 04:59, Takeshi Yamamuro <
>>>>>> linguin....@gmail.com>
>>>>>> >> > wrote:
>>>>>> >> >
>>>>>> >> > Hi, all
>>>>>> >> >
>>>>>> >> > See SPARK-1529 for related discussion.
>>>>>> >> >
>>>>>> >> > // maropu
>>>>>> >>
>>>>>> >>
>>>>>> >> I'd not seen that discussion.
>>>>>> >>
>>>>>> >> I'm actually curious about why the 15% diff in performance between
>>>>>> Java
>>>>>> >> NIO and Hadoop FS APIs, and, if it is the case (Hadoop still uses
>>>>>> the
>>>>>> >> pre-NIO libraries, *has anyone thought of just fixing Hadoop Local
>>>>>> FS
>>>>>> >> codepath?*
>>>>>> >>
>>>>>> >> It's not like anyone hasn't filed JIRAs on that ... it's just that
>>>>>> nothing
>>>>>> >> has ever got to a state where it was considered ready to adopt,
>>>>>> where
>>>>>> >> "ready" means: passes all unit and load tests against Linux, Unix,
>>>>>> Windows
>>>>>> >> filesystems. There's been some attempts, but they never quite got
>>>>>> much
>>>>>> >> engagement or support, especially as nio wasn't there properly
>>>>>> until Java 7,
>>>>>> >> —and Hadoop was stuck on java 6 support until 2015. That's no
>>>>>> longer a
>>>>>> >> constraint: someone could do the work, using the existing JIRAs as
>>>>>> starting
>>>>>> >> points.
>>>>>> >>
>>>>>> >>
>>>>>> >> If someone did do this in RawLocalFS, it'd be nice if the patch
>>>>>> also
>>>>>> >> allowed you to turn off CRC creation and checking.
>>>>>> >>
>>>>>> >> That's not only part of the overhead, it means that flush()
>>>>>> doesn't, not
>>>>>> >> until you reach the end of a CRC32 block ... so breaking what few
>>>>>> durability
>>>>>> >> guarantees POSIX offers.
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> > Michael Gummelt
>>>>>> > Software Engineer
>>>>>> > Mesosphere
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Michael Gummelt
>>>>> Software Engineer
>>>>> Mesosphere
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Michael Gummelt
>>> Software Engineer
>>> Mesosphere
>>>
>>
>>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>

Reply via email to