Great work, Bo! Would love to hear the details.

On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid> wrote:

> I'm interested in remote shuffle services as well. I'd love to hear about
> what you're using in production!
>
> rb
>
> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bobyan...@gmail.com> wrote:
>
>> Hi Ben,
>>
>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team in
>> Seattle, and working on disaggregated shuffle (we called it remote shuffle
>> service, RSS, internally). We have put RSS into production for a while, and
>> learned a lot during the work (tried quite a few techniques to improve the
>> remote shuffle performance). We could share our learning with the
>> community, and also would like to hear feedback/suggestions on how to
>> further improve remote shuffle performance. We could chat more details if
>> you or other people are interested.
>>
>> Best,
>> Bo
>>
>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <sid...@google.com.invalid>
>> wrote:
>>
>>> I would like to start a conversation about extending the Spark shuffle
>>> manager surface to support fully disaggregated shuffle implementations.
>>> This is closely related to the work in SPARK-25299
>>> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused
>>> on refactoring the shuffle manager API (and in particular,
>>> SortShuffleManager) to use a pluggable storage backend. The motivation for
>>> that SPIP is further enabling Spark on Kubernetes.
>>>
>>>
>>> The motivation for this proposal is enabling full externalized
>>> (disaggregated) shuffle service implementations. (Facebook’s Cosco
>>> shuffle
>>> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>
>>> is one example of such a disaggregated shuffle service.) These changes
>>> allow the bulk of the shuffle to run in a remote service such that minimal
>>> state resides in executors and local disk spill is minimized. The net
>>> effect is increased job stability and performance improvements in certain
>>> scenarios. These changes should work well with or are complementary to
>>> SPARK-25299. Some or all points may be merged into that issue as
>>> appropriate.
>>>
>>>
>>> Below is a description of each component of this proposal. These changes
>>> can ideally be introduced incrementally. I would like to gather feedback
>>> and gauge interest from others in the community to collaborate on this.
>>> There are likely more points that would  be useful to disaggregated shuffle
>>> services. We can outline a more concrete plan after gathering enough input.
>>> A working session could help us kick off this joint effort; maybe something
>>> in the mid-January to mid-February timeframe (depending on interest and
>>> availability. I’m happy to host at our Sunnyvale, CA offices.
>>>
>>>
>>> ProposalScheduling and re-executing tasks
>>>
>>> Allow coordination between the service and the Spark DAG scheduler as to
>>> whether a given block/partition needs to be recomputed when a task fails or
>>> when shuffle block data cannot be read. Having such coordination is
>>> important, e.g., for suppressing recomputation after aborted executors or
>>> for forcing late recomputation if the service internally acts as a cache.
>>> One catchall solution is to have the shuffle manager provide an indication
>>> of whether shuffle data is external to executors (or nodes). Another
>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>> internal details for some shuffle managers.
>>> ShuffleManager API
>>>
>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>>> service knows that data is still active. This is one way to enable
>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>> on robust communication with Spark and in general has a distinct lifecycle
>>> from the Spark deployment(s) it talks to. This would likely take the form
>>> of a callback on ShuffleManager itself, but there are other approaches.
>>>
>>>
>>> Add lifecycle hooks to shuffle readers and writers (e.g., to
>>> close/recycle connections/streams/file handles as well as provide commit
>>> semantics). SPARK-25299 adds commit semantics to the internal data storage
>>> layer, but this is applicable to all shuffle managers at a higher level and
>>> should apply equally to the ShuffleWriter.
>>>
>>>
>>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where
>>> they are not needed. Ideally, this would be an implementation detail of the
>>> shuffle manager itself. If there is substantial overlap between the
>>> SortShuffleManager and other implementations, then the storage details can
>>> be abstracted at the appropriate level. (SPARK-25299 does not currently
>>> change this.)
>>>
>>>
>>> Do not require MapStatus to include blockmanager IDs where they are not
>>> relevant. This is captured by ShuffleBlockInfo
>>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj>
>>> including an optional BlockManagerId in SPARK-25299. However, this
>>> change should be lifted to the MapStatus level so that it applies to all
>>> ShuffleManagers. Alternatively, use a more general data-location
>>> abstraction than BlockManagerId. This gives the shuffle manager more
>>> flexibility and the scheduler more information with respect to data
>>> residence.
>>> Serialization
>>>
>>> Allow serializers to be used more flexibly and efficiently. For example,
>>> have serializers support writing an arbitrary number of objects into an
>>> existing OutputStream or ByteBuffer. This enables objects to be serialized
>>> to direct buffers where doing so makes sense. More importantly, it allows
>>> arbitrary metadata/framing data to be wrapped around individual objects
>>> cheaply. Right now, that’s only possible at the stream level. (There are
>>> hacks around this, but this would enable more idiomatic use in efficient
>>> shuffle implementations.)
>>>
>>>
>>> Have serializers indicate whether they are deterministic. This provides
>>> much of the value of a shuffle service because it means that reducers do
>>> not need to spill to disk when reading/merging/combining inputs--the data
>>> can be grouped by the service, even without the service understanding data
>>> types or byte representations. Alternative (less preferable since it would
>>> break Java serialization, for example): require all serializers to be
>>> deterministic.
>>>
>>>
>>>
>>> --
>>>
>>> - Ben
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
John Zhuge

Reply via email to