Hi

This project is a completely different solution towards this problem, but
in the hadoop mapreduce context.

https://github.com/nielsbasjes/splittablegzip


I have used this a lot in the past.
Perhaps porting this project to beam is an option?

Niels Basjes



On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:

> Sorry I couldn't be more helpful.
>
> *From: *Allie Chen <yifangc...@google.com>
> *Date: *Tue, May 14, 2019 at 10:09 AM
> *To: * <dev@beam.apache.org>
> *Cc: *user
>
> Thank Lukasz. Unfortunately, decompressing the files is not an option for
>> us.
>>
>>
>> I am trying to speed up Reshuffle step, since it waits for all data. Here
>> are two ways I have tried:
>>
>> 1.  add timestamps to the PCollection's elements after reading (since it
>> is bounded source), then apply windowing before Reshuffle, but it still
>> waits all data.
>>
>>
>> 2.  run the pipeline with --streaming flag, but it leads to an error:
>> Workflow failed. Causes: Expected custom source to have non-zero number of
>> splits. Also, I found in
>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
>> :
>>
>> *DataflowRunner does not currently support the following Cloud Dataflow
>> specific features with Python streaming execution.*
>>
>>    -
>>
>>    *Streaming autoscaling*
>>
>> I doubt whether this approach can solve my issue.
>>
>>
>> Thanks so much!
>>
>> Allie
>>
>> *From: *Lukasz Cwik <lc...@google.com>
>> *Date: *Tue, May 14, 2019 at 11:16 AM
>> *To: *dev
>> *Cc: *user
>>
>> Do you need to perform any joins across the files (e.g.
>>> Combine.perKey/GroupByKey/...)?
>>> If not, you could structure your pipeline
>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>> and then run it as a batch pipeline.
>>>
>>> You can set --streaming=true on the pipeline and then it will run in a
>>> streaming mode but streaming prioritizes low latency and correctness on
>>> Google Cloud Dataflow so it will cost more to run your pipeline then in
>>> batch mode. It may make more sense to store the data uncompressed as it may
>>> be less expensive then paying the additional compute cost for streaming.
>>>
>>> *From: *Allie Chen <yifangc...@google.com>
>>> *Date: *Tue, May 14, 2019 at 7:38 AM
>>> *To: * <dev@beam.apache.org>
>>> *Cc: *user
>>>
>>> Is it possible to use windowing or somehow pretend it is streaming so
>>>> Reshuffle or GroupByKey won't wait until all data has been read?
>>>>
>>>> Thanks!
>>>> Allie
>>>>
>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 5:36 PM
>>>> *To: *dev
>>>> *Cc: *user
>>>>
>>>> There is no such flag to turn of fusion.
>>>>>
>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>>>>> when it is limited to a small number of workers.
>>>>>
>>>>> If you can split up your input into a lot of smaller files that are
>>>>> compressed then you shouldn't need to use the reshuffle but still could if
>>>>> you found it helped.
>>>>>
>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yifangc...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Re Lukasz: Thanks! I am not able to control the compression format
>>>>>> but I will see whether the splitting gzip files will work. Is there a
>>>>>> simple flag in Dataflow that could turn off the fusion?
>>>>>>
>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey
>>>>>> and FlatMap in Reshuffle are very slow when the data is large. Reshuffle
>>>>>> itself is not parallel either.
>>>>>>
>>>>>> Thanks all,
>>>>>>
>>>>>> Allie
>>>>>>
>>>>>> *From: *Reuven Lax <re...@google.com>
>>>>>> *Date: *Fri, May 10, 2019 at 5:02 PM
>>>>>> *To: *dev
>>>>>> *Cc: *user
>>>>>>
>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely
>>>>>>> that simply reading and decompressing all that data was very slow when
>>>>>>> there was no parallelism.
>>>>>>>
>>>>>>> *From: *Allie Chen <yifangc...@google.com>
>>>>>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>>>>>> *To: * <dev@beam.apache.org>
>>>>>>> *Cc: * <u...@beam.apache.org>
>>>>>>>
>>>>>>> Yes, I do see the data after reshuffle are processed in parallel.
>>>>>>>> But Reshuffle transform itself takes hours or even days to run, 
>>>>>>>> according
>>>>>>>> to one test (24 gzip files, 17 million lines in total) I did.
>>>>>>>>
>>>>>>>> The file format for our users are mostly gzip format, since
>>>>>>>> uncompressed files would be too costly to store (It could be in 
>>>>>>>> hundreds of
>>>>>>>> GB).
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Allie
>>>>>>>>
>>>>>>>>
>>>>>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>>>>>> *To: *dev, <u...@beam.apache.org>
>>>>>>>>
>>>>>>>> +u...@beam.apache.org <u...@beam.apache.org>
>>>>>>>>>
>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits
>>>>>>>>> till all the data has been read before the next transforms can run. 
>>>>>>>>> After
>>>>>>>>> the reshuffle, the data should have been processed in parallel across 
>>>>>>>>> the
>>>>>>>>> workers. Did you see this?
>>>>>>>>>
>>>>>>>>> Are you able to change the input of your pipeline to use an
>>>>>>>>> uncompressed file or many compressed files?
>>>>>>>>>
>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yifangc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since
>>>>>>>>>> the compressed file is not splittable, one worker is allocated to 
>>>>>>>>>> read the
>>>>>>>>>> file. The same worker will do all the other transforms since 
>>>>>>>>>> Dataflow fused
>>>>>>>>>> all transforms together.  There are a large amount of data in the 
>>>>>>>>>> file, and
>>>>>>>>>> I expect to see more workers spinning up after reading transforms. I 
>>>>>>>>>> tried
>>>>>>>>>> to use Reshuffle Transform
>>>>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed 
>>>>>>>>>> until
>>>>>>>>>> all data arrived at this point.
>>>>>>>>>>
>>>>>>>>>> Is there any other ways to allow more workers working on all the
>>>>>>>>>> other transforms after reading?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Allie
>>>>>>>>>>
>>>>>>>>>>

Reply via email to