Thanks Robert! We'd definitely like to be able to re-use existing I/O components--for example the Writer<DestinationT, OutputT> <https://github.com/apache/beam/blob/a2b0ad14f1525d1a645cb26f5b8ec45692d9d54e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java#L76> /FileBasedReader<T> <http://public abstract static class FileBasedReader<T> extends OffsetBasedReader<T> {> (since they operate on a WritableByteChannel/ReadableByteChannel, which is the level of granularity we need) but the Writers, at least, seem to be mostly private-access. Do you foresee them being made public at any point?
- Claire On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw <rober...@google.com> wrote: > I left some comments on the doc. > > I think the general idea is sound, but one thing that worries me is > the introduction of a parallel set of IOs that mirrors the (existing) > FileIOs. I would suggest either (1) incorporate this functionality > into the generic FileIO infrastructure, or let it be parameterized by > arbitrary IO (which I'm not sure is possible, especially for the Read > side (and better would be the capability of supporting arbitrary > sources, aka an optional "as-sharded-source" operation that returns a > PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where the iterable is > promised to be in key order)) or support a single SMB aka > "PreGrouping" source/sink pair that's aways used together (and whose > underlying format is not necessarily public). > > On Sat, Jul 13, 2019 at 3:19 PM Neville Li <neville....@gmail.com> wrote: > > > > 4 people have commented but mostly clarifying details and not much on > the overall design. > > > > It'd be great to have thumbs up/down on the design, specifically > metadata, bucket & shard strategy, etc., since that affects backwards > compatibility of output files. > > Some breaking changes, e.g. dynamic # of shards, are out of scope for V1 > unless someone feels strongly about it. The current scope should cover all > our use cases and leave room for optimization. > > > > Once green lighted we can start adopting internally, ironing out rough > edges while iterating on the PRs in parallel. > > > > Most of the implementation is self-contained in the extensions:smb > module, except making a few core classes/methods public for reuse. So > despite the amount of work it's still fairly low risk to the code base. > There're some proposed optimization & refactoring involving core (see > appendix) but IMO they're better left for followup PRs. > > > > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles <k...@apache.org> > wrote: > >> > >> I've seen some discussion on the doc. I cannot tell whether the > questions are resolved or what the status of review is. Would you mind > looping this thread with a quick summary? This is such a major piece of > work I don't want it to sit with everyone thinking they are waiting on > someone else, or any such thing. (not saying this is happening, just > pinging to be sure) > >> > >> Kenn > >> > >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li <neville....@gmail.com> > wrote: > >>> > >>> Updated the doc a bit with more future work (appendix). IMO most of > them are non-breaking and better done in separate PRs later since some > involve pretty big refactoring and are outside the scope of MVP. > >>> > >>> For now we'd really like to get feedback on some fundamental design > decisions and find a way to move forward. > >>> > >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li <neville....@gmail.com> > wrote: > >>>> > >>>> Thanks. I responded to comments in the doc. More inline. > >>>> > >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath < > chamik...@google.com> wrote: > >>>>> > >>>>> Thanks added few comments. > >>>>> > >>>>> If I understood correctly, you basically assign elements with keys > to different buckets which are written to unique files and merge files for > the same key while reading ? > >>>>> > >>>>> Some of my concerns are. > >>>>> > >>>>> (1) Seems like you rely on an in-memory sorting of buckets. Will > this end up limiting the size of a PCollection you can process ? > >>>> > >>>> The sorter transform we're using supports spilling and external sort. > We can break up large key groups further by sharding, similar to fan out in > some GBK transforms. > >>>> > >>>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which is > actually implemented using a shuffle (which you try to replace with this > proposal). > >>>> > >>>> That's for distributing task metadata, so that each DoFn thread picks > up a random bucket and sort merge key-values. It's not shuffling actual > data. > >>>> > >>>>> > >>>>> (3) I think (at least some of the) shuffle implementations are > implemented in ways similar to this (writing to files and merging). So I'm > wondering if the performance benefits you see are for a very specific case > and may limit the functionality in other ways. > >>>> > >>>> This is for the common pattern of few core data producer pipelines > and many downstream consumer pipelines. It's not intended to replace > shuffle/join within a single pipeline. On the producer side, by > pre-grouping/sorting data and writing to bucket/shard output files, the > consumer can sort/merge matching ones without a CoGBK. Essentially we're > paying the shuffle cost upfront to avoid them repeatedly in each consumer > pipeline that wants to join data. > >>>> > >>>>> > >>>>> Thanks, > >>>>> Cham > >>>>> > >>>>> > >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li <neville....@gmail.com> > wrote: > >>>>>> > >>>>>> Ping again. Any chance someone takes a look to get this thing > going? It's just a design doc and basic metadata/IO impl. We're not talking > about actual source/sink code yet (already done but saved for future PRs). > >>>>>> > >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay <al...@google.com> > wrote: > >>>>>>> > >>>>>>> Thank you Claire, this looks promising. Explicitly adding a few > folks that might have feedback: +Ismaël Mejía +Robert Bradshaw +Lukasz Cwik > +Chamikara Jayalath > >>>>>>> > >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty < > claire.d.mcgi...@gmail.com> wrote: > >>>>>>>> > >>>>>>>> Hey dev@! > >>>>>>>> > >>>>>>>> Myself and a few other Spotify data engineers have put together a > design doc for SMB Join support in Beam, and have a working Java > implementation we've started to put up for PR ([0], [1], [2]). There's more > detailed information in the document, but the tl;dr is that SMB is a > strategy to optimize joins for file-based sources by modifying the initial > write operation to write records in sorted buckets based on the desired > join key. This means that subsequent joins of datasets written in this way > are only sequential file reads, no shuffling involved. We've seen some > pretty substantial performance speedups with our implementation and would > love to get it checked in to Beam's Java SDK. > >>>>>>>> > >>>>>>>> We'd appreciate any suggestions or feedback on our proposal--the > design doc should be public to comment on. > >>>>>>>> > >>>>>>>> Thanks! > >>>>>>>> Claire / Neville >