Do you need to perform any joins across the files (e.g.
Combine.perKey/GroupByKey/...)?
If not, you could structure your pipeline
ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
and then run it as a batch pipeline.

You can set --streaming=true on the pipeline and then it will run in a
streaming mode but streaming prioritizes low latency and correctness on
Google Cloud Dataflow so it will cost more to run your pipeline then in
batch mode. It may make more sense to store the data uncompressed as it may
be less expensive then paying the additional compute cost for streaming.

*From: *Allie Chen <yifangc...@google.com>
*Date: *Tue, May 14, 2019 at 7:38 AM
*To: * <dev@beam.apache.org>
*Cc: *user

Is it possible to use windowing or somehow pretend it is streaming so
> Reshuffle or GroupByKey won't wait until all data has been read?
>
> Thanks!
> Allie
>
> *From: *Lukasz Cwik <lc...@google.com>
> *Date: *Fri, May 10, 2019 at 5:36 PM
> *To: *dev
> *Cc: *user
>
> There is no such flag to turn of fusion.
>>
>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>> when it is limited to a small number of workers.
>>
>> If you can split up your input into a lot of smaller files that are
>> compressed then you shouldn't need to use the reshuffle but still could if
>> you found it helped.
>>
>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yifangc...@google.com> wrote:
>>
>>> Re Lukasz: Thanks! I am not able to control the compression format but I
>>> will see whether the splitting gzip files will work. Is there a simple flag
>>> in Dataflow that could turn off the fusion?
>>>
>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
>>> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
>>> is not parallel either.
>>>
>>> Thanks all,
>>>
>>> Allie
>>>
>>> *From: *Reuven Lax <re...@google.com>
>>> *Date: *Fri, May 10, 2019 at 5:02 PM
>>> *To: *dev
>>> *Cc: *user
>>>
>>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>>> simply reading and decompressing all that data was very slow when there was
>>>> no parallelism.
>>>>
>>>> *From: *Allie Chen <yifangc...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>>> *To: * <dev@beam.apache.org>
>>>> *Cc: * <u...@beam.apache.org>
>>>>
>>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>>
>>>>> The file format for our users are mostly gzip format, since
>>>>> uncompressed files would be too costly to store (It could be in hundreds 
>>>>> of
>>>>> GB).
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Allie
>>>>>
>>>>>
>>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>>> *To: *dev, <u...@beam.apache.org>
>>>>>
>>>>> +u...@beam.apache.org <u...@beam.apache.org>
>>>>>>
>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>>>> all the data has been read before the next transforms can run. After the
>>>>>> reshuffle, the data should have been processed in parallel across the
>>>>>> workers. Did you see this?
>>>>>>
>>>>>> Are you able to change the input of your pipeline to use an
>>>>>> uncompressed file or many compressed files?
>>>>>>
>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yifangc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>>>>> compressed file is not splittable, one worker is allocated to read the
>>>>>>> file. The same worker will do all the other transforms since Dataflow 
>>>>>>> fused
>>>>>>> all transforms together.  There are a large amount of data in the file, 
>>>>>>> and
>>>>>>> I expect to see more workers spinning up after reading transforms. I 
>>>>>>> tried
>>>>>>> to use Reshuffle Transform
>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed 
>>>>>>> until
>>>>>>> all data arrived at this point.
>>>>>>>
>>>>>>> Is there any other ways to allow more workers working on all the
>>>>>>> other transforms after reading?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Allie
>>>>>>>
>>>>>>>

Reply via email to