Hi This project is a completely different solution towards this problem, but in the hadoop mapreduce context.
https://github.com/nielsbasjes/splittablegzip I have used this a lot in the past. Perhaps porting this project to beam is an option? Niels Basjes On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote: > Sorry I couldn't be more helpful. > > *From: *Allie Chen <yifangc...@google.com> > *Date: *Tue, May 14, 2019 at 10:09 AM > *To: * <dev@beam.apache.org> > *Cc: *user > > Thank Lukasz. Unfortunately, decompressing the files is not an option for >> us. >> >> >> I am trying to speed up Reshuffle step, since it waits for all data. Here >> are two ways I have tried: >> >> 1. add timestamps to the PCollection's elements after reading (since it >> is bounded source), then apply windowing before Reshuffle, but it still >> waits all data. >> >> >> 2. run the pipeline with --streaming flag, but it leads to an error: >> Workflow failed. Causes: Expected custom source to have non-zero number of >> splits. Also, I found in >> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features >> : >> >> *DataflowRunner does not currently support the following Cloud Dataflow >> specific features with Python streaming execution.* >> >> - >> >> *Streaming autoscaling* >> >> I doubt whether this approach can solve my issue. >> >> >> Thanks so much! >> >> Allie >> >> *From: *Lukasz Cwik <lc...@google.com> >> *Date: *Tue, May 14, 2019 at 11:16 AM >> *To: *dev >> *Cc: *user >> >> Do you need to perform any joins across the files (e.g. >>> Combine.perKey/GroupByKey/...)? >>> If not, you could structure your pipeline >>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA >>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB >>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC >>> and then run it as a batch pipeline. >>> >>> You can set --streaming=true on the pipeline and then it will run in a >>> streaming mode but streaming prioritizes low latency and correctness on >>> Google Cloud Dataflow so it will cost more to run your pipeline then in >>> batch mode. It may make more sense to store the data uncompressed as it may >>> be less expensive then paying the additional compute cost for streaming. >>> >>> *From: *Allie Chen <yifangc...@google.com> >>> *Date: *Tue, May 14, 2019 at 7:38 AM >>> *To: * <dev@beam.apache.org> >>> *Cc: *user >>> >>> Is it possible to use windowing or somehow pretend it is streaming so >>>> Reshuffle or GroupByKey won't wait until all data has been read? >>>> >>>> Thanks! >>>> Allie >>>> >>>> *From: *Lukasz Cwik <lc...@google.com> >>>> *Date: *Fri, May 10, 2019 at 5:36 PM >>>> *To: *dev >>>> *Cc: *user >>>> >>>> There is no such flag to turn of fusion. >>>>> >>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time >>>>> when it is limited to a small number of workers. >>>>> >>>>> If you can split up your input into a lot of smaller files that are >>>>> compressed then you shouldn't need to use the reshuffle but still could if >>>>> you found it helped. >>>>> >>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yifangc...@google.com> >>>>> wrote: >>>>> >>>>>> Re Lukasz: Thanks! I am not able to control the compression format >>>>>> but I will see whether the splitting gzip files will work. Is there a >>>>>> simple flag in Dataflow that could turn off the fusion? >>>>>> >>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey >>>>>> and FlatMap in Reshuffle are very slow when the data is large. Reshuffle >>>>>> itself is not parallel either. >>>>>> >>>>>> Thanks all, >>>>>> >>>>>> Allie >>>>>> >>>>>> *From: *Reuven Lax <re...@google.com> >>>>>> *Date: *Fri, May 10, 2019 at 5:02 PM >>>>>> *To: *dev >>>>>> *Cc: *user >>>>>> >>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely >>>>>>> that simply reading and decompressing all that data was very slow when >>>>>>> there was no parallelism. >>>>>>> >>>>>>> *From: *Allie Chen <yifangc...@google.com> >>>>>>> *Date: *Fri, May 10, 2019 at 1:17 PM >>>>>>> *To: * <dev@beam.apache.org> >>>>>>> *Cc: * <u...@beam.apache.org> >>>>>>> >>>>>>> Yes, I do see the data after reshuffle are processed in parallel. >>>>>>>> But Reshuffle transform itself takes hours or even days to run, >>>>>>>> according >>>>>>>> to one test (24 gzip files, 17 million lines in total) I did. >>>>>>>> >>>>>>>> The file format for our users are mostly gzip format, since >>>>>>>> uncompressed files would be too costly to store (It could be in >>>>>>>> hundreds of >>>>>>>> GB). >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Allie >>>>>>>> >>>>>>>> >>>>>>>> *From: *Lukasz Cwik <lc...@google.com> >>>>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM >>>>>>>> *To: *dev, <u...@beam.apache.org> >>>>>>>> >>>>>>>> +u...@beam.apache.org <u...@beam.apache.org> >>>>>>>>> >>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits >>>>>>>>> till all the data has been read before the next transforms can run. >>>>>>>>> After >>>>>>>>> the reshuffle, the data should have been processed in parallel across >>>>>>>>> the >>>>>>>>> workers. Did you see this? >>>>>>>>> >>>>>>>>> Are you able to change the input of your pipeline to use an >>>>>>>>> uncompressed file or many compressed files? >>>>>>>>> >>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yifangc...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since >>>>>>>>>> the compressed file is not splittable, one worker is allocated to >>>>>>>>>> read the >>>>>>>>>> file. The same worker will do all the other transforms since >>>>>>>>>> Dataflow fused >>>>>>>>>> all transforms together. There are a large amount of data in the >>>>>>>>>> file, and >>>>>>>>>> I expect to see more workers spinning up after reading transforms. I >>>>>>>>>> tried >>>>>>>>>> to use Reshuffle Transform >>>>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516> >>>>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed >>>>>>>>>> until >>>>>>>>>> all data arrived at this point. >>>>>>>>>> >>>>>>>>>> Is there any other ways to allow more workers working on all the >>>>>>>>>> other transforms after reading? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> >>>>>>>>>> Allie >>>>>>>>>> >>>>>>>>>>