[ 
https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=258090&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-258090
 ]

ASF GitHub Bot logged work on BEAM-6766:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Jun/19 20:28
            Start Date: 11/Jun/19 20:28
    Worklog Time Spent: 10m 
      Work Description: ClaireMcGinty commented on pull request #8824: 
[BEAM-6766] Implement SMB file operations
URL: https://github.com/apache/beam/pull/8824
 
 
   This PR is a small, discrete chunk of the larger [SMB 
branch/PR](https://github.com/apache/beam/pull/8486), extracted into a smaller 
PR for easier reviewing as suggested [in the JIRA 
ticket](https://issues.apache.org/jira/browse/BEAM-6766). It contains 
read/write logic for Avro, Tensorflow, and JSON records at a per-record 
granularity. These are needed because once the PCollection is bucketed and 
sorted, we can't re-use existing sources/sinks and risk a reshuffling of the 
data, so we have to manage file reads and writes very explicitly.
   
   CC @kennknowles @nevillelyh @reuvenlax 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
 <br> [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
 
   Portable | --- | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
 | --- | ---
   
   See 
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
 for trigger phrase, status and link of all Jenkins jobs.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 258090)
    Time Spent: 1h 40m  (was: 1.5h)

> Sort Merge Bucket Join support in Beam
> --------------------------------------
>
>                 Key: BEAM-6766
>                 URL: https://issues.apache.org/jira/browse/BEAM-6766
>             Project: Beam
>          Issue Type: Improvement
>          Components: extensions-java-join-library, io-ideas
>            Reporter: Claire McGinty
>            Assignee: Claire McGinty
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Hi! Spotify has been internally prototyping and testing an implementation of 
> the sort merge join using Beam primitives and we're interested in 
> contributing it open-source – probably to Beam's extensions package in its 
> own `smb` module or as part of the joins module?
> We've tested this with Avro files using Avro's GenericDatumWriter/Reader 
> directly (although this could theoretically be expanded to other 
> serialization formats). We'd add two transforms*, an SMB write and an SMB 
> join. 
> SMB write would take in one PCollection and a # of buckets and:
> 1) Apply a partitioning function to the input to assign each record to one 
> bucket. (the user code would have to statically specify a # of buckets... 
> hard to see a way to do this dynamically.)
> 2) Group by that bucket ID and within each bucket perform an in-memory sort 
> on join key. If the grouped records are too large to fit in memory, fall back 
> to an external sort (although if this happens, user should probably increase 
> bucket size so every group fits in memory).
> 3) Directly write the contents of bucket to a sequentially named file.
> 4) Write a metadata file to the same output path with info about hash 
> algorithm/# buckets.
> SMB join would take in the input paths for 2 or more Sources, all of which 
> are written in a bucketed and partitioned way, and :
> 1) Verify that the metadata files have compatible bucket # and hash algorithm.
> 2) Expand the input paths to enumerate the `ResourceIds` of every file in the 
> paths. Group all inputs with the same bucket ID.
> 3) Within each group, open a file reader on all `ResourceIds`. Sequentially 
> read files one record at a time, outputting tuples of all record pairs with 
> matching join key.
>  \* These could be implemented either directly as `PTransforms` with the 
> writer being a `DoFn` but I semantically do like the idea of extending 
> `FileBasedSource`/`Sink` with abstract classes like 
> `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a 
> sink as KV pairs of <Bucket #, <Sorted Iterable of Records>>>, so that the # 
> of elements in the PCollection == # of buckets == # of output files, we could 
> just implement something like `SortedBucketSink` extending `FileBasedSink` 
> with a dynamic file naming function. I'd like to be able to take advantage of 
> the existing write/read implementation logic in the `io` package as much as 
> possible although I guess some of those are package private. 
> –
> From our internal testing, we've seen some substantial performance 
> improvements using the right bucket size--not only by avoiding a shuffle 
> during the join step, but also in storage costs, since we're getting better 
> compression in Avro by storing sorted records.
> Please let us know what you think/any concerns we can address! Our 
> implementation isn't quite production-ready yet, but we'd like to start a 
> discussion about it early.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to