Sorry I couldn't be more helpful. *From: *Allie Chen <[email protected]> *Date: *Tue, May 14, 2019 at 10:09 AM *To: * <[email protected]> *Cc: *user
Thank Lukasz. Unfortunately, decompressing the files is not an option for > us. > > > I am trying to speed up Reshuffle step, since it waits for all data. Here > are two ways I have tried: > > 1. add timestamps to the PCollection's elements after reading (since it > is bounded source), then apply windowing before Reshuffle, but it still > waits all data. > > > 2. run the pipeline with --streaming flag, but it leads to an error: > Workflow failed. Causes: Expected custom source to have non-zero number of > splits. Also, I found in > https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features > : > > *DataflowRunner does not currently support the following Cloud Dataflow > specific features with Python streaming execution.* > > - > > *Streaming autoscaling* > > I doubt whether this approach can solve my issue. > > > Thanks so much! > > Allie > > *From: *Lukasz Cwik <[email protected]> > *Date: *Tue, May 14, 2019 at 11:16 AM > *To: *dev > *Cc: *user > > Do you need to perform any joins across the files (e.g. >> Combine.perKey/GroupByKey/...)? >> If not, you could structure your pipeline >> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA >> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB >> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC >> and then run it as a batch pipeline. >> >> You can set --streaming=true on the pipeline and then it will run in a >> streaming mode but streaming prioritizes low latency and correctness on >> Google Cloud Dataflow so it will cost more to run your pipeline then in >> batch mode. It may make more sense to store the data uncompressed as it may >> be less expensive then paying the additional compute cost for streaming. >> >> *From: *Allie Chen <[email protected]> >> *Date: *Tue, May 14, 2019 at 7:38 AM >> *To: * <[email protected]> >> *Cc: *user >> >> Is it possible to use windowing or somehow pretend it is streaming so >>> Reshuffle or GroupByKey won't wait until all data has been read? >>> >>> Thanks! >>> Allie >>> >>> *From: *Lukasz Cwik <[email protected]> >>> *Date: *Fri, May 10, 2019 at 5:36 PM >>> *To: *dev >>> *Cc: *user >>> >>> There is no such flag to turn of fusion. >>>> >>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time >>>> when it is limited to a small number of workers. >>>> >>>> If you can split up your input into a lot of smaller files that are >>>> compressed then you shouldn't need to use the reshuffle but still could if >>>> you found it helped. >>>> >>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <[email protected]> >>>> wrote: >>>> >>>>> Re Lukasz: Thanks! I am not able to control the compression format but >>>>> I will see whether the splitting gzip files will work. Is there a simple >>>>> flag in Dataflow that could turn off the fusion? >>>>> >>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey >>>>> and FlatMap in Reshuffle are very slow when the data is large. Reshuffle >>>>> itself is not parallel either. >>>>> >>>>> Thanks all, >>>>> >>>>> Allie >>>>> >>>>> *From: *Reuven Lax <[email protected]> >>>>> *Date: *Fri, May 10, 2019 at 5:02 PM >>>>> *To: *dev >>>>> *Cc: *user >>>>> >>>>> It's unlikely that Reshuffle itself takes hours. It's more likely that >>>>>> simply reading and decompressing all that data was very slow when there >>>>>> was >>>>>> no parallelism. >>>>>> >>>>>> *From: *Allie Chen <[email protected]> >>>>>> *Date: *Fri, May 10, 2019 at 1:17 PM >>>>>> *To: * <[email protected]> >>>>>> *Cc: * <[email protected]> >>>>>> >>>>>> Yes, I do see the data after reshuffle are processed in parallel. But >>>>>>> Reshuffle transform itself takes hours or even days to run, according to >>>>>>> one test (24 gzip files, 17 million lines in total) I did. >>>>>>> >>>>>>> The file format for our users are mostly gzip format, since >>>>>>> uncompressed files would be too costly to store (It could be in >>>>>>> hundreds of >>>>>>> GB). >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Allie >>>>>>> >>>>>>> >>>>>>> *From: *Lukasz Cwik <[email protected]> >>>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM >>>>>>> *To: *dev, <[email protected]> >>>>>>> >>>>>>> [email protected] <[email protected]> >>>>>>>> >>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits >>>>>>>> till all the data has been read before the next transforms can run. >>>>>>>> After >>>>>>>> the reshuffle, the data should have been processed in parallel across >>>>>>>> the >>>>>>>> workers. Did you see this? >>>>>>>> >>>>>>>> Are you able to change the input of your pipeline to use an >>>>>>>> uncompressed file or many compressed files? >>>>>>>> >>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> >>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since >>>>>>>>> the compressed file is not splittable, one worker is allocated to >>>>>>>>> read the >>>>>>>>> file. The same worker will do all the other transforms since Dataflow >>>>>>>>> fused >>>>>>>>> all transforms together. There are a large amount of data in the >>>>>>>>> file, and >>>>>>>>> I expect to see more workers spinning up after reading transforms. I >>>>>>>>> tried >>>>>>>>> to use Reshuffle Transform >>>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516> >>>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed >>>>>>>>> until >>>>>>>>> all data arrived at this point. >>>>>>>>> >>>>>>>>> Is there any other ways to allow more workers working on all the >>>>>>>>> other transforms after reading? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Allie >>>>>>>>> >>>>>>>>>
