As a follow up the pricing as the number of bytes written + read to the
shuffle is confirmed.

However we were able to figure out a way to lower shuffle costs and things
are right in the world again.

Thanks ya'll!

On Wed, Sep 18, 2019 at 4:52 PM Reuven Lax <> wrote:

> I believe that the Total shuffle data process counter counts the number of
> bytes written to shuffle + the number of bytes read. So if you shuffle 1GB
> of data, you should expect to see 2GB on the counter.
> On Wed, Sep 18, 2019 at 2:39 PM Shannon Duncan <>
> wrote:
>> Ok just ran the job on a small input and did not specify numShards. so
>> it's literally just:
>> .apply("WriteLines", TextIO.write().to(options.getOutput()));
>> Output of map for join:
>> [image: image.png]
>> Details of Shuffle:
>> [image: image.png]
>> Reported Bytes Shuffled:
>> [image: image.png]
>> On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax <> wrote:
>>> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <
>>>> wrote:
>>>> I will attempt to do without sharding (though I believe we did do a run
>>>> without shards and it incurred the extra shuffle costs).
>>> It shouldn't. There will be a shuffle, but that shuffle should contain a
>>> small amount of data (essentially a list of filenames).
>>>> Pipeline is simple.
>>>> The only shuffle that is explicitly defined is the shuffle after
>>>> merging files together into a single PCollection (Flatten Transform).
>>>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected
>>>> to pay for shuffles on the middle shuffle but were surprised to see that
>>>> the output data from the Flatten was quadrupled in the reflected shuffled
>>>> GB shown in Dataflow. Which lead me down this path of finding things.
>>>> [image: image.png]
>>>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax <> wrote:
>>>>> In that case you should be able to leave sharding unspecified, and you
>>>>> won't incur the extra shuffle. Specifying explicit sharding is generally
>>>>> necessary only for streaming.
>>>>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>>>>>> wrote:
>>>>>> batch on dataflowRunner.
>>>>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <> wrote:
>>>>>>> Are you using streaming or batch? Also which runner are you using?
>>>>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>>>>>>> wrote:
>>>>>>>> So I followed up on why TextIO shuffles and dug into the code some.
>>>>>>>> It is using the shards and getting all the values into a keyed group to
>>>>>>>> write to a single file.
>>>>>>>> However... I wonder if there is way to just take the records that
>>>>>>>> are on a worker and write them out. Thus not needing a shard number and
>>>>>>>> doing this. Closer to how hadoop handle's writes.
>>>>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer
>>>>>>>> and processElement reuses that writter to write to the same file for 
>>>>>>>> all
>>>>>>>> elements within a bundle?
>>>>>>>> I feel like this goes beyond scope of simple user mailing list so
>>>>>>>> I'm expanding it to dev as well.
>>>>>>>> +dev <>
>>>>>>>> Finding a solution that prevents quadrupling shuffle costs when
>>>>>>>> simply writing out a file is a necessity for large scale jobs that work
>>>>>>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>>>>>>> Thanks,
>>>>>>>> Shannon Duncan
>>>>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>>>>>>> wrote:
>>>>>>>>> We have been using Beam for a bit now. However we just turned on
>>>>>>>>> the dataflow shuffle service and were very surprised that the 
>>>>>>>>> shuffled data
>>>>>>>>> amounts were quadruple the amounts we expected.
>>>>>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>>>>>> itself.
>>>>>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>>>>> Thanks,
>>>>>>>>> Shannon Duncan

Reply via email to