[
https://issues.apache.org/jira/browse/BEAM-6766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16795093#comment-16795093
]
Claire McGinty commented on BEAM-6766:
--------------------------------------
cc [~reuvenlax], can you share your thoughts when you have a chance? :)
> Sort Merge Bucket Join support in Beam
> --------------------------------------
>
> Key: BEAM-6766
> URL: https://issues.apache.org/jira/browse/BEAM-6766
> Project: Beam
> Issue Type: Improvement
> Components: io-ideas, sdk-java-join-library
> Reporter: Claire McGinty
> Priority: Major
>
> Hi! Spotify has been internally prototyping and testing an implementation of
> the sort merge join using Beam primitives and we're interested in
> contributing it open-source – probably to Beam's extensions/join-library
> package?
> We've tested this with Avro files using Avro's GenericDatumWriter/Reader
> directly (although this could theoretically be expanded to other
> serialization types). We'd add two transforms, an SMB write and an SMB join.
> SMB write would take in one PCollection and a # of buckets and:
> 1) Apply a partitioning function to the input to assign each record to one
> bucket.
> 2) Group by that bucket ID and within each bucket perform an in-memory sort
> on join key. If the grouped records are too large to fit in memory, fall back
> to an external sort (although if this happens, user should probably increase
> bucket size so every group fits in memory).
> 3) Directly write the contents of bucket to a sequentially named file.
> 4) Write a metadata file to the same output path with info about hash
> algorithm/# buckets.
> SMB join would take in the input paths for 2 or more Sources, all of which
> are written in a bucketed and partitioned way, and :
> 1) Verify that the metadata files have compatible bucket # and hash algorithm.
> 2) Expand the input paths to enumerate the `ResourceIds` of every file in the
> paths. Group all inputs with the same bucket ID.
> 3) Within each group, open a file reader on all `ResourceIds`. Sequentially
> read files one record at a time, outputting tuples of all record pairs with
> matching join key.
> --
> We've seen some substantial performance improvements using the right bucket
> size--not only by avoiding a shuffle during the join step, but also in
> storage costs, since we're getting better compression in Avro by storing
> sorted records.
> Please let us know what you think/any concerns we can address! Our
> implementation isn't quite production-ready yet, but we'd like to start a
> discussion about it early.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)