Is there a reason that you need to explicitly specify the number of shards? If you don't, then this extra shuffle will not be performed.
Reuven On Fri, Sep 27, 2019 at 12:12 PM Shannon Duncan <joseph.dun...@liveramp.com> wrote: > Interesting. Right now we are only doing batch processing so I hadn't > thought about the windowing aspect. > > On Fri, Sep 27, 2019 at 12:10 PM Reuven Lax <re...@google.com> wrote: > >> Are you doing this in streaming with windowed writes? Window grouping >> does not "happen" in Beam until a GroupByKey, so you do need the GroupByKey >> in that case. >> >> If you are not windowing but want a specific number of shards (though the >> general suggestion in that case is to not pick a specific number of shards, >> but let the runner pick it for you), your approach could work. However the >> implementation would be more complicated than you suggest. The problem is >> that every file writer has a buffer, and when you force many of them to be >> in memory in a map you risk running out of memory. If you look at the >> spilledFiles code in WriteFiles.java, it was written to handle exactly this >> case. >> >> Reuven >> >> On Fri, Sep 27, 2019 at 8:47 AM Shannon Duncan < >> joseph.dun...@liveramp.com> wrote: >> >>> Yes, Specifically TextIO withNumShards(). >>> >>> On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax <re...@google.com> wrote: >>> >>>> I'm not sure what you mean by "write out ot a specific shard number." >>>> Are you talking about FIleIO sinks? >>>> >>>> Reuven >>>> >>>> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan < >>>> joseph.dun...@liveramp.com> wrote: >>>> >>>>> So when beam writes out to a specific shard number, as I understand it >>>>> does a few things: >>>>> >>>>> - Assigns a shard key to each record (reduces parallelism) >>>>> - Shuffles and Groups by the shard key to colocate all records >>>>> - Writes out to each shard file within a single DoFn per key... >>>>> >>>>> When thinking about this, I believe we might be able to eliminate the >>>>> GroupByKey to go ahead and write out to each file with its records with >>>>> only a DoFn after the shard key is assigned. >>>>> >>>>> As long as the shard key is the actual key of the PCollection, then >>>>> could we use a state variable to force all keys that are the same to >>>>> process to share state with each other? >>>>> >>>>> On a DoFn can we use the setup to hold a Map of files being written to >>>>> within bundles on that instance, and on teardown can we close all files >>>>> within the map? >>>>> >>>>> If this is the case does it reduce the need for a shuffle and allow a >>>>> DoFn to safely write out in append mode to a file, batch, etc held in >>>>> state? >>>>> >>>>> It doesn't really decrease parallelism after the key is assigned since >>>>> it can parallelize over each key within its state window. Which is the >>>>> same >>>>> level of parallelism we achieve by doing a GroupByKey and doing a for loop >>>>> over the result. So performance shouldn't be impacted if this holds true. >>>>> >>>>> It's kind of like combining both the shuffle and the data write in the >>>>> same step? >>>>> >>>>> This does however have a significant cost reduction by eliminating a >>>>> compute based shuffle and also eliminating a Dataflow shuffle service call >>>>> if shuffle service is enabled. >>>>> >>>>> Thoughts? >>>>> >>>>> Thanks, >>>>> Shannon Duncan >>>>> >>>>