That will be great. Please send us the invite. On Wed, Nov 20, 2019 at 8:56 AM bo yang <bobyan...@gmail.com> wrote:
> Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested! > Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm > PST. We could discuss more details there. Do you want to join? > > On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <amo...@qubole.com> wrote: > >> We at Qubole are also looking at disaggregating shuffle on Spark. Would >> love to collaborate and share learnings. >> >> Regards, >> Amogh >> >> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jzh...@apache.org> wrote: >> >>> Great work, Bo! Would love to hear the details. >>> >>> >>> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid> >>> wrote: >>> >>>> I'm interested in remote shuffle services as well. I'd love to hear >>>> about what you're using in production! >>>> >>>> rb >>>> >>>> On Tue, Nov 19, 2019 at 2:43 PM bo yang <bobyan...@gmail.com> wrote: >>>> >>>>> Hi Ben, >>>>> >>>>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team >>>>> in Seattle, and working on disaggregated shuffle (we called it remote >>>>> shuffle service, RSS, internally). We have put RSS into production for a >>>>> while, and learned a lot during the work (tried quite a few techniques to >>>>> improve the remote shuffle performance). We could share our learning with >>>>> the community, and also would like to hear feedback/suggestions on how to >>>>> further improve remote shuffle performance. We could chat more details if >>>>> you or other people are interested. >>>>> >>>>> Best, >>>>> Bo >>>>> >>>>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <sid...@google.com.invalid> >>>>> wrote: >>>>> >>>>>> I would like to start a conversation about extending the Spark >>>>>> shuffle manager surface to support fully disaggregated shuffle >>>>>> implementations. This is closely related to the work in SPARK-25299 >>>>>> <https://issues.apache.org/jira/browse/SPARK-25299>, which is >>>>>> focused on refactoring the shuffle manager API (and in particular, >>>>>> SortShuffleManager) to use a pluggable storage backend. The motivation >>>>>> for >>>>>> that SPIP is further enabling Spark on Kubernetes. >>>>>> >>>>>> >>>>>> The motivation for this proposal is enabling full externalized >>>>>> (disaggregated) shuffle service implementations. (Facebook’s Cosco >>>>>> shuffle >>>>>> <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service> >>>>>> is one example of such a disaggregated shuffle service.) These changes >>>>>> allow the bulk of the shuffle to run in a remote service such that >>>>>> minimal >>>>>> state resides in executors and local disk spill is minimized. The net >>>>>> effect is increased job stability and performance improvements in certain >>>>>> scenarios. These changes should work well with or are complementary to >>>>>> SPARK-25299. Some or all points may be merged into that issue as >>>>>> appropriate. >>>>>> >>>>>> >>>>>> Below is a description of each component of this proposal. These >>>>>> changes can ideally be introduced incrementally. I would like to gather >>>>>> feedback and gauge interest from others in the community to collaborate >>>>>> on >>>>>> this. There are likely more points that would be useful to disaggregated >>>>>> shuffle services. We can outline a more concrete plan after gathering >>>>>> enough input. A working session could help us kick off this joint effort; >>>>>> maybe something in the mid-January to mid-February timeframe (depending >>>>>> on >>>>>> interest and availability. I’m happy to host at our Sunnyvale, CA >>>>>> offices. >>>>>> >>>>>> >>>>>> ProposalScheduling and re-executing tasks >>>>>> >>>>>> Allow coordination between the service and the Spark DAG scheduler as >>>>>> to whether a given block/partition needs to be recomputed when a task >>>>>> fails >>>>>> or when shuffle block data cannot be read. Having such coordination is >>>>>> important, e.g., for suppressing recomputation after aborted executors or >>>>>> for forcing late recomputation if the service internally acts as a cache. >>>>>> One catchall solution is to have the shuffle manager provide an >>>>>> indication >>>>>> of whether shuffle data is external to executors (or nodes). Another >>>>>> option: allow the shuffle manager (likely on the driver) to be queried >>>>>> for >>>>>> the existence of shuffle data for a given executor ID (or perhaps map >>>>>> task, >>>>>> reduce task, etc). Note that this is at the level of data the scheduler >>>>>> is >>>>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are >>>>>> internal details for some shuffle managers. >>>>>> ShuffleManager API >>>>>> >>>>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that >>>>>> the service knows that data is still active. This is one way to enable >>>>>> time-/job-scoped data because a disaggregated shuffle service cannot rely >>>>>> on robust communication with Spark and in general has a distinct >>>>>> lifecycle >>>>>> from the Spark deployment(s) it talks to. This would likely take the form >>>>>> of a callback on ShuffleManager itself, but there are other approaches. >>>>>> >>>>>> >>>>>> Add lifecycle hooks to shuffle readers and writers (e.g., to >>>>>> close/recycle connections/streams/file handles as well as provide commit >>>>>> semantics). SPARK-25299 adds commit semantics to the internal data >>>>>> storage >>>>>> layer, but this is applicable to all shuffle managers at a higher level >>>>>> and >>>>>> should apply equally to the ShuffleWriter. >>>>>> >>>>>> >>>>>> Do not require ShuffleManagers to expose ShuffleBlockResolvers where >>>>>> they are not needed. Ideally, this would be an implementation detail of >>>>>> the >>>>>> shuffle manager itself. If there is substantial overlap between the >>>>>> SortShuffleManager and other implementations, then the storage details >>>>>> can >>>>>> be abstracted at the appropriate level. (SPARK-25299 does not currently >>>>>> change this.) >>>>>> >>>>>> >>>>>> Do not require MapStatus to include blockmanager IDs where they are >>>>>> not relevant. This is captured by ShuffleBlockInfo >>>>>> <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj> >>>>>> including an optional BlockManagerId in SPARK-25299. However, this >>>>>> change should be lifted to the MapStatus level so that it applies to all >>>>>> ShuffleManagers. Alternatively, use a more general data-location >>>>>> abstraction than BlockManagerId. This gives the shuffle manager more >>>>>> flexibility and the scheduler more information with respect to data >>>>>> residence. >>>>>> Serialization >>>>>> >>>>>> Allow serializers to be used more flexibly and efficiently. For >>>>>> example, have serializers support writing an arbitrary number of objects >>>>>> into an existing OutputStream or ByteBuffer. This enables objects to be >>>>>> serialized to direct buffers where doing so makes sense. More >>>>>> importantly, >>>>>> it allows arbitrary metadata/framing data to be wrapped around individual >>>>>> objects cheaply. Right now, that’s only possible at the stream level. >>>>>> (There are hacks around this, but this would enable more idiomatic use in >>>>>> efficient shuffle implementations.) >>>>>> >>>>>> >>>>>> Have serializers indicate whether they are deterministic. This >>>>>> provides much of the value of a shuffle service because it means that >>>>>> reducers do not need to spill to disk when reading/merging/combining >>>>>> inputs--the data can be grouped by the service, even without the service >>>>>> understanding data types or byte representations. Alternative (less >>>>>> preferable since it would break Java serialization, for example): require >>>>>> all serializers to be deterministic. >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> - Ben >>>>>> >>>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>>> >>> >>> >>> -- >>> John Zhuge >>> >> -- John Zhuge