Maybe the solution implemented on JdbcIO [1], [2] could be helpful in this
cases.

[1] https://issues.apache.org/jira/browse/BEAM-2803
[2]
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1088-L1118

On Fri, May 10, 2019 at 11:36 PM Lukasz Cwik <lc...@google.com> wrote:

> There is no such flag to turn of fusion.
>
> Writing 100s of GiBs of uncompressed data to reshuffle will take time when
> it is limited to a small number of workers.
>
> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
>
> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yifangc...@google.com> wrote:
>
>> Re Lukasz: Thanks! I am not able to control the compression format but I
>> will see whether the splitting gzip files will work. Is there a simple flag
>> in Dataflow that could turn off the fusion?
>>
>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
>> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
>> is not parallel either.
>>
>> Thanks all,
>>
>> Allie
>>
>> *From: *Reuven Lax <re...@google.com>
>> *Date: *Fri, May 10, 2019 at 5:02 PM
>> *To: *dev
>> *Cc: *user
>>
>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>> simply reading and decompressing all that data was very slow when there was
>>> no parallelism.
>>>
>>> *From: *Allie Chen <yifangc...@google.com>
>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>> *To: * <dev@beam.apache.org>
>>> *Cc: * <u...@beam.apache.org>
>>>
>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>
>>>> The file format for our users are mostly gzip format, since
>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>> GB).
>>>>
>>>> Thanks,
>>>>
>>>> Allie
>>>>
>>>>
>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>> *To: *dev, <u...@beam.apache.org>
>>>>
>>>> +u...@beam.apache.org <u...@beam.apache.org>
>>>>>
>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>>> all the data has been read before the next transforms can run. After the
>>>>> reshuffle, the data should have been processed in parallel across the
>>>>> workers. Did you see this?
>>>>>
>>>>> Are you able to change the input of your pipeline to use an
>>>>> uncompressed file or many compressed files?
>>>>>
>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yifangc...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>>>> compressed file is not splittable, one worker is allocated to read the
>>>>>> file. The same worker will do all the other transforms since Dataflow 
>>>>>> fused
>>>>>> all transforms together.  There are a large amount of data in the file, 
>>>>>> and
>>>>>> I expect to see more workers spinning up after reading transforms. I 
>>>>>> tried
>>>>>> to use Reshuffle Transform
>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed 
>>>>>> until
>>>>>> all data arrived at this point.
>>>>>>
>>>>>> Is there any other ways to allow more workers working on all the
>>>>>> other transforms after reading?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Allie
>>>>>>
>>>>>>

Reply via email to