Claire McGinty created BEAM-6766:
------------------------------------

             Summary: Sort Merge Bucket Join support in Beam
                 Key: BEAM-6766
                 URL: https://issues.apache.org/jira/browse/BEAM-6766
             Project: Beam
          Issue Type: Improvement
          Components: io-ideas, sdk-java-join-library
            Reporter: Claire McGinty


Hi! Spotify has been internally prototyping and testing an implementation of 
the sort merge join using Beam primitives and we're interested in contributing 
it open-source – probably to Beam's extensions/join-library package?

We've tested this with Avro files using Avro's GenericDatumWriter/Reader 
directly (although this could theoretically be expanded to other serialization 
types). We'd add two transforms, an SMB write and an SMB join. 

SMB write would take in one PCollection and a # of buckets and:

1) Apply a partitioning function to the input to assign each record to one 
bucket.

2) Group by that bucket ID and within each bucket perform an in-memory sort on 
join key. If the grouped records are too large to fit in memory, fall back to 
an external sort (although if this happens, user should probably increase 
bucket size so every group fits in memory).

3) Directly write the contents of bucket to a sequentially named file.

4) Write a metadata file to the same output path with info about hash 
algorithm/# buckets.

SMB join would take in the input paths for 2 or more Sources, all of which are 
written in a bucketed and partitioned way, and :

1) Verify that the metadata files have compatible bucket # and hash algorithm.

2) Expand the input paths to enumerate the `ResourceIds` of every file in the 
paths. Group all inputs with the same bucket ID.

3) Within each group, open a file reader on all `ResourceIds`. Sequentially 
read files one record at a time, outputting tuples of all record pairs with 
matching join key.

--

We've seen some substantial performance improvements using the right bucket 
size--not only by avoiding a shuffle during the join step, but also in storage 
costs, since we're getting better compression in Avro by storing sorted records.

Please let us know what you think/any concerns we can address! Our 
implementation isn't quite production-ready yet, but we'd like to start a 
discussion about it early.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to