Sorry I couldn't be more helpful.

*From: *Allie Chen <[email protected]>
*Date: *Tue, May 14, 2019 at 10:09 AM
*To: * <[email protected]>
*Cc: *user

Thank Lukasz. Unfortunately, decompressing the files is not an option for
> us.
>
>
> I am trying to speed up Reshuffle step, since it waits for all data. Here
> are two ways I have tried:
>
> 1.  add timestamps to the PCollection's elements after reading (since it
> is bounded source), then apply windowing before Reshuffle, but it still
> waits all data.
>
>
> 2.  run the pipeline with --streaming flag, but it leads to an error:
> Workflow failed. Causes: Expected custom source to have non-zero number of
> splits. Also, I found in
> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
> :
>
> *DataflowRunner does not currently support the following Cloud Dataflow
> specific features with Python streaming execution.*
>
>    -
>
>    *Streaming autoscaling*
>
> I doubt whether this approach can solve my issue.
>
>
> Thanks so much!
>
> Allie
>
> *From: *Lukasz Cwik <[email protected]>
> *Date: *Tue, May 14, 2019 at 11:16 AM
> *To: *dev
> *Cc: *user
>
> Do you need to perform any joins across the files (e.g.
>> Combine.perKey/GroupByKey/...)?
>> If not, you could structure your pipeline
>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>> and then run it as a batch pipeline.
>>
>> You can set --streaming=true on the pipeline and then it will run in a
>> streaming mode but streaming prioritizes low latency and correctness on
>> Google Cloud Dataflow so it will cost more to run your pipeline then in
>> batch mode. It may make more sense to store the data uncompressed as it may
>> be less expensive then paying the additional compute cost for streaming.
>>
>> *From: *Allie Chen <[email protected]>
>> *Date: *Tue, May 14, 2019 at 7:38 AM
>> *To: * <[email protected]>
>> *Cc: *user
>>
>> Is it possible to use windowing or somehow pretend it is streaming so
>>> Reshuffle or GroupByKey won't wait until all data has been read?
>>>
>>> Thanks!
>>> Allie
>>>
>>> *From: *Lukasz Cwik <[email protected]>
>>> *Date: *Fri, May 10, 2019 at 5:36 PM
>>> *To: *dev
>>> *Cc: *user
>>>
>>> There is no such flag to turn of fusion.
>>>>
>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>>>> when it is limited to a small number of workers.
>>>>
>>>> If you can split up your input into a lot of smaller files that are
>>>> compressed then you shouldn't need to use the reshuffle but still could if
>>>> you found it helped.
>>>>
>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <[email protected]>
>>>> wrote:
>>>>
>>>>> Re Lukasz: Thanks! I am not able to control the compression format but
>>>>> I will see whether the splitting gzip files will work. Is there a simple
>>>>> flag in Dataflow that could turn off the fusion?
>>>>>
>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey
>>>>> and FlatMap in Reshuffle are very slow when the data is large. Reshuffle
>>>>> itself is not parallel either.
>>>>>
>>>>> Thanks all,
>>>>>
>>>>> Allie
>>>>>
>>>>> *From: *Reuven Lax <[email protected]>
>>>>> *Date: *Fri, May 10, 2019 at 5:02 PM
>>>>> *To: *dev
>>>>> *Cc: *user
>>>>>
>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>>>>> simply reading and decompressing all that data was very slow when there 
>>>>>> was
>>>>>> no parallelism.
>>>>>>
>>>>>> *From: *Allie Chen <[email protected]>
>>>>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>>>>> *To: * <[email protected]>
>>>>>> *Cc: * <[email protected]>
>>>>>>
>>>>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>>>>
>>>>>>> The file format for our users are mostly gzip format, since
>>>>>>> uncompressed files would be too costly to store (It could be in 
>>>>>>> hundreds of
>>>>>>> GB).
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Allie
>>>>>>>
>>>>>>>
>>>>>>> *From: *Lukasz Cwik <[email protected]>
>>>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>>>>> *To: *dev, <[email protected]>
>>>>>>>
>>>>>>> [email protected] <[email protected]>
>>>>>>>>
>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits
>>>>>>>> till all the data has been read before the next transforms can run. 
>>>>>>>> After
>>>>>>>> the reshuffle, the data should have been processed in parallel across 
>>>>>>>> the
>>>>>>>> workers. Did you see this?
>>>>>>>>
>>>>>>>> Are you able to change the input of your pipeline to use an
>>>>>>>> uncompressed file or many compressed files?
>>>>>>>>
>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since
>>>>>>>>> the compressed file is not splittable, one worker is allocated to 
>>>>>>>>> read the
>>>>>>>>> file. The same worker will do all the other transforms since Dataflow 
>>>>>>>>> fused
>>>>>>>>> all transforms together.  There are a large amount of data in the 
>>>>>>>>> file, and
>>>>>>>>> I expect to see more workers spinning up after reading transforms. I 
>>>>>>>>> tried
>>>>>>>>> to use Reshuffle Transform
>>>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed 
>>>>>>>>> until
>>>>>>>>> all data arrived at this point.
>>>>>>>>>
>>>>>>>>> Is there any other ways to allow more workers working on all the
>>>>>>>>> other transforms after reading?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Allie
>>>>>>>>>
>>>>>>>>>

Reply via email to