Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested! Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm PST. We could discuss more details there. Do you want to join?
On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <[email protected]> wrote: > We at Qubole are also looking at disaggregating shuffle on Spark. Would > love to collaborate and share learnings. > > Regards, > Amogh > > On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <[email protected]> wrote: > >> Great work, Bo! Would love to hear the details. >> >> >> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <[email protected]> >> wrote: >> >>> I'm interested in remote shuffle services as well. I'd love to hear >>> about what you're using in production! >>> >>> rb >>> >>> On Tue, Nov 19, 2019 at 2:43 PM bo yang <[email protected]> wrote: >>> >>>> Hi Ben, >>>> >>>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team >>>> in Seattle, and working on disaggregated shuffle (we called it remote >>>> shuffle service, RSS, internally). We have put RSS into production for a >>>> while, and learned a lot during the work (tried quite a few techniques to >>>> improve the remote shuffle performance). We could share our learning with >>>> the community, and also would like to hear feedback/suggestions on how to >>>> further improve remote shuffle performance. We could chat more details if >>>> you or other people are interested. >>>> >>>> Best, >>>> Bo >>>> >>>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <[email protected]> >>>> wrote: >>>> >>>>> I would like to start a conversation about extending the Spark shuffle >>>>> manager surface to support fully disaggregated shuffle implementations. >>>>> This is closely related to the work in SPARK-25299 >>>>> <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused >>>>> on refactoring the shuffle manager API (and in particular, >>>>> SortShuffleManager) to use a pluggable storage backend. The motivation for >>>>> that SPIP is further enabling Spark on Kubernetes. >>>>> >>>>> >>>>> The motivation for this proposal is enabling full externalized >>>>> (disaggregated) shuffle service implementations. (Facebook’s Cosco >>>>> shuffle >>>>> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service> >>>>> is one example of such a disaggregated shuffle service.) These changes >>>>> allow the bulk of the shuffle to run in a remote service such that minimal >>>>> state resides in executors and local disk spill is minimized. The net >>>>> effect is increased job stability and performance improvements in certain >>>>> scenarios. These changes should work well with or are complementary to >>>>> SPARK-25299. Some or all points may be merged into that issue as >>>>> appropriate. >>>>> >>>>> >>>>> Below is a description of each component of this proposal. These >>>>> changes can ideally be introduced incrementally. I would like to gather >>>>> feedback and gauge interest from others in the community to collaborate on >>>>> this. There are likely more points that would be useful to disaggregated >>>>> shuffle services. We can outline a more concrete plan after gathering >>>>> enough input. A working session could help us kick off this joint effort; >>>>> maybe something in the mid-January to mid-February timeframe (depending on >>>>> interest and availability. I’m happy to host at our Sunnyvale, CA offices. >>>>> >>>>> >>>>> ProposalScheduling and re-executing tasks >>>>> >>>>> Allow coordination between the service and the Spark DAG scheduler as >>>>> to whether a given block/partition needs to be recomputed when a task >>>>> fails >>>>> or when shuffle block data cannot be read. Having such coordination is >>>>> important, e.g., for suppressing recomputation after aborted executors or >>>>> for forcing late recomputation if the service internally acts as a cache. >>>>> One catchall solution is to have the shuffle manager provide an indication >>>>> of whether shuffle data is external to executors (or nodes). Another >>>>> option: allow the shuffle manager (likely on the driver) to be queried for >>>>> the existence of shuffle data for a given executor ID (or perhaps map >>>>> task, >>>>> reduce task, etc). Note that this is at the level of data the scheduler is >>>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are >>>>> internal details for some shuffle managers. >>>>> ShuffleManager API >>>>> >>>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that >>>>> the service knows that data is still active. This is one way to enable >>>>> time-/job-scoped data because a disaggregated shuffle service cannot rely >>>>> on robust communication with Spark and in general has a distinct lifecycle >>>>> from the Spark deployment(s) it talks to. This would likely take the form >>>>> of a callback on ShuffleManager itself, but there are other approaches. >>>>> >>>>> >>>>> Add lifecycle hooks to shuffle readers and writers (e.g., to >>>>> close/recycle connections/streams/file handles as well as provide commit >>>>> semantics). SPARK-25299 adds commit semantics to the internal data storage >>>>> layer, but this is applicable to all shuffle managers at a higher level >>>>> and >>>>> should apply equally to the ShuffleWriter. >>>>> >>>>> >>>>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where >>>>> they are not needed. Ideally, this would be an implementation detail of >>>>> the >>>>> shuffle manager itself. If there is substantial overlap between the >>>>> SortShuffleManager and other implementations, then the storage details can >>>>> be abstracted at the appropriate level. (SPARK-25299 does not currently >>>>> change this.) >>>>> >>>>> >>>>> Do not require MapStatus to include blockmanager IDs where they are >>>>> not relevant. This is captured by ShuffleBlockInfo >>>>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj> >>>>> including an optional BlockManagerId in SPARK-25299. However, this >>>>> change should be lifted to the MapStatus level so that it applies to all >>>>> ShuffleManagers. Alternatively, use a more general data-location >>>>> abstraction than BlockManagerId. This gives the shuffle manager more >>>>> flexibility and the scheduler more information with respect to data >>>>> residence. >>>>> Serialization >>>>> >>>>> Allow serializers to be used more flexibly and efficiently. For >>>>> example, have serializers support writing an arbitrary number of objects >>>>> into an existing OutputStream or ByteBuffer. This enables objects to be >>>>> serialized to direct buffers where doing so makes sense. More importantly, >>>>> it allows arbitrary metadata/framing data to be wrapped around individual >>>>> objects cheaply. Right now, that’s only possible at the stream level. >>>>> (There are hacks around this, but this would enable more idiomatic use in >>>>> efficient shuffle implementations.) >>>>> >>>>> >>>>> Have serializers indicate whether they are deterministic. This >>>>> provides much of the value of a shuffle service because it means that >>>>> reducers do not need to spill to disk when reading/merging/combining >>>>> inputs--the data can be grouped by the service, even without the service >>>>> understanding data types or byte representations. Alternative (less >>>>> preferable since it would break Java serialization, for example): require >>>>> all serializers to be deterministic. >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> - Ben >>>>> >>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> >> >> -- >> John Zhuge >> >
