[ 
https://issues.apache.org/jira/browse/BEAM-6766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16795093#comment-16795093
 ] 

Claire McGinty commented on BEAM-6766:
--------------------------------------

cc [~reuvenlax], can you share your thoughts when you have a chance? :) 

> Sort Merge Bucket Join support in Beam
> --------------------------------------
>
>                 Key: BEAM-6766
>                 URL: https://issues.apache.org/jira/browse/BEAM-6766
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-ideas, sdk-java-join-library
>            Reporter: Claire McGinty
>            Priority: Major
>
> Hi! Spotify has been internally prototyping and testing an implementation of 
> the sort merge join using Beam primitives and we're interested in 
> contributing it open-source – probably to Beam's extensions/join-library 
> package?
> We've tested this with Avro files using Avro's GenericDatumWriter/Reader 
> directly (although this could theoretically be expanded to other 
> serialization types). We'd add two transforms, an SMB write and an SMB join. 
> SMB write would take in one PCollection and a # of buckets and:
> 1) Apply a partitioning function to the input to assign each record to one 
> bucket.
> 2) Group by that bucket ID and within each bucket perform an in-memory sort 
> on join key. If the grouped records are too large to fit in memory, fall back 
> to an external sort (although if this happens, user should probably increase 
> bucket size so every group fits in memory).
> 3) Directly write the contents of bucket to a sequentially named file.
> 4) Write a metadata file to the same output path with info about hash 
> algorithm/# buckets.
> SMB join would take in the input paths for 2 or more Sources, all of which 
> are written in a bucketed and partitioned way, and :
> 1) Verify that the metadata files have compatible bucket # and hash algorithm.
> 2) Expand the input paths to enumerate the `ResourceIds` of every file in the 
> paths. Group all inputs with the same bucket ID.
> 3) Within each group, open a file reader on all `ResourceIds`. Sequentially 
> read files one record at a time, outputting tuples of all record pairs with 
> matching join key.
> --
> We've seen some substantial performance improvements using the right bucket 
> size--not only by avoiding a shuffle during the join step, but also in 
> storage costs, since we're getting better compression in Avro by storing 
> sorted records.
> Please let us know what you think/any concerns we can address! Our 
> implementation isn't quite production-ready yet, but we'd like to start a 
> discussion about it early.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to