Is there a reason that you need to explicitly specify the number of shards?
If you don't, then this extra shuffle will not be performed.

Reuven

On Fri, Sep 27, 2019 at 12:12 PM Shannon Duncan <joseph.dun...@liveramp.com>
wrote:

> Interesting. Right now we are only doing batch processing so I hadn't
> thought about the windowing aspect.
>
> On Fri, Sep 27, 2019 at 12:10 PM Reuven Lax <re...@google.com> wrote:
>
>> Are you doing this in streaming with windowed writes? Window grouping
>> does not "happen" in Beam until a GroupByKey, so you do need the GroupByKey
>> in that case.
>>
>> If you are not windowing but want a specific number of shards (though the
>> general suggestion in that case is to not pick a specific number of shards,
>> but let the runner pick it for you), your approach could work. However the
>> implementation would be more complicated than you suggest. The problem is
>> that every file writer has a buffer, and when you force many of them to be
>> in memory in a map you risk running out of memory. If you look at the
>> spilledFiles code in WriteFiles.java, it was written to handle exactly this
>> case.
>>
>> Reuven
>>
>> On Fri, Sep 27, 2019 at 8:47 AM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> Yes, Specifically TextIO withNumShards().
>>>
>>> On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I'm not sure what you mean by "write out ot a specific shard number."
>>>> Are you talking about FIleIO sinks?
>>>>
>>>> Reuven
>>>>
>>>> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan <
>>>> joseph.dun...@liveramp.com> wrote:
>>>>
>>>>> So when beam writes out to a specific shard number, as I understand it
>>>>> does a few things:
>>>>>
>>>>> - Assigns a shard key to each record (reduces parallelism)
>>>>> - Shuffles and Groups by the shard key to colocate all records
>>>>> - Writes out to each shard file within a single DoFn per key...
>>>>>
>>>>> When thinking about this, I believe we might be able to eliminate the
>>>>> GroupByKey to go ahead and write out to each file with its records with
>>>>> only a DoFn after the shard key is assigned.
>>>>>
>>>>> As long as the shard key is the actual key of the PCollection, then
>>>>> could we use a state variable to force all keys that are the same to
>>>>> process to share state with each other?
>>>>>
>>>>> On a DoFn can we use the setup to hold a Map of files being written to
>>>>> within bundles on that instance, and on teardown can we close all files
>>>>> within the map?
>>>>>
>>>>> If this is the case does it reduce the need for a shuffle and allow a
>>>>> DoFn to safely write out in append mode to a file, batch, etc held in
>>>>> state?
>>>>>
>>>>> It doesn't really decrease parallelism after the key is assigned since
>>>>> it can parallelize over each key within its state window. Which is the 
>>>>> same
>>>>> level of parallelism we achieve by doing a GroupByKey and doing a for loop
>>>>> over the result. So performance shouldn't be impacted if this holds true.
>>>>>
>>>>> It's kind of like combining both the shuffle and the data write in the
>>>>> same step?
>>>>>
>>>>> This does however have a significant cost reduction by eliminating a
>>>>> compute based shuffle and also eliminating a Dataflow shuffle service call
>>>>> if shuffle service is enabled.
>>>>>
>>>>> Thoughts?
>>>>>
>>>>> Thanks,
>>>>> Shannon Duncan
>>>>>
>>>>

Reply via email to