[
https://issues.apache.org/jira/browse/BEAM-6766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Beam JIRA Bot updated BEAM-6766:
--------------------------------
Labels: (was: stale-P2)
> Sort Merge Bucket Join support in Beam
> --------------------------------------
>
> Key: BEAM-6766
> URL: https://issues.apache.org/jira/browse/BEAM-6766
> Project: Beam
> Issue Type: Improvement
> Components: extensions-java-join-library, io-ideas
> Reporter: Claire McGinty
> Priority: P3
> Time Spent: 11h 50m
> Remaining Estimate: 0h
>
> Design doc:
> https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit#
> Hi! Spotify has been internally prototyping and testing an implementation of
> the sort merge join using Beam primitives and we're interested in
> contributing it open-source – probably to Beam's extensions package in its
> own `smb` module or as part of the joins module?
> We've tested this with Avro files using Avro's GenericDatumWriter/Reader
> directly (although this could theoretically be expanded to other
> serialization formats). We'd add two transforms*, an SMB write and an SMB
> join.
> SMB write would take in one PCollection and a # of buckets and:
> 1) Apply a partitioning function to the input to assign each record to one
> bucket. (the user code would have to statically specify a # of buckets...
> hard to see a way to do this dynamically.)
> 2) Group by that bucket ID and within each bucket perform an in-memory sort
> on join key. If the grouped records are too large to fit in memory, fall back
> to an external sort (although if this happens, user should probably increase
> bucket size so every group fits in memory).
> 3) Directly write the contents of bucket to a sequentially named file.
> 4) Write a metadata file to the same output path with info about hash
> algorithm/# buckets.
> SMB join would take in the input paths for 2 or more Sources, all of which
> are written in a bucketed and partitioned way, and :
> 1) Verify that the metadata files have compatible bucket # and hash algorithm.
> 2) Expand the input paths to enumerate the `ResourceIds` of every file in the
> paths. Group all inputs with the same bucket ID.
> 3) Within each group, open a file reader on all `ResourceIds`. Sequentially
> read files one record at a time, outputting tuples of all record pairs with
> matching join key.
> \* These could be implemented either directly as `PTransforms` with the
> writer being a `DoFn` but I semantically do like the idea of extending
> `FileBasedSource`/`Sink` with abstract classes like
> `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a
> sink as KV pairs of <Bucket #, <Sorted Iterable of Records>>>, so that the #
> of elements in the PCollection == # of buckets == # of output files, we could
> just implement something like `SortedBucketSink` extending `FileBasedSink`
> with a dynamic file naming function. I'd like to be able to take advantage of
> the existing write/read implementation logic in the `io` package as much as
> possible although I guess some of those are package private.
> –
> From our internal testing, we've seen some substantial performance
> improvements using the right bucket size--not only by avoiding a shuffle
> during the join step, but also in storage costs, since we're getting better
> compression in Avro by storing sorted records.
> Please let us know what you think/any concerns we can address! Our
> implementation isn't quite production-ready yet, but we'd like to start a
> discussion about it early.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)