@Robert

Does your suggestion imply, that the points made by Eugene on BEAM-2803 do
not apply (anymore) and the combined reshuffle could just be omitted?

On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <rober...@google.com> wrote:

> Unfortunately the "write" portion of the reshuffle cannot be
> parallelized more than the source that it's reading from. In my
> experience, generally the read is the bottleneck in this case, but
> it's possible (e.g. if the input compresses extremely well) that it is
> the write that is slow (which you seem to indicate based on your
> observation of the UI, right?).
>
> It could be that materializing to temporary files is cheaper than
> materializing randomly to shuffle (especially on pre-portable Python).
> In that case you could force a fusion break with a side input instead.
> E.g.
>
> class FusionBreak(beam.PTransform):
>     def expand(self, pcoll):
>         # Create an empty PCollection that depends on pcoll.
>         empty = pcoll | beam.FlatMap(lambda x: ())
>         # Use this empty PCollection as a side input, which will force
> a fusion break.
>         return pcoll | beam.Map(lambda x, unused: x,
> beam.pvalue.AsIterable(empty))
>
> which could be used in place of Reshard like
>
>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>
> You'll probably want to be sure to pass the use_fastavro experiment as
> well.
>
> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
> >
> > Hi
> >
> > This project is a completely different solution towards this problem,
> but in the hadoop mapreduce context.
> >
> > https://github.com/nielsbasjes/splittablegzip
> >
> >
> > I have used this a lot in the past.
> > Perhaps porting this project to beam is an option?
> >
> > Niels Basjes
> >
> >
> >
> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
> >>
> >> Sorry I couldn't be more helpful.
> >>
> >> From: Allie Chen <yifangc...@google.com>
> >> Date: Tue, May 14, 2019 at 10:09 AM
> >> To: <dev@beam.apache.org>
> >> Cc: user
> >>
> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option
> for us.
> >>>
> >>>
> >>> I am trying to speed up Reshuffle step, since it waits for all data.
> Here are two ways I have tried:
> >>>
> >>> 1.  add timestamps to the PCollection's elements after reading (since
> it is bounded source), then apply windowing before Reshuffle, but it still
> waits all data.
> >>>
> >>>
> >>> 2.  run the pipeline with --streaming flag, but it leads to an error:
> Workflow failed. Causes: Expected custom source to have non-zero number of
> splits. Also, I found in
> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
> :
> >>>
> >>> DataflowRunner does not currently support the following Cloud Dataflow
> specific features with Python streaming execution.
> >>>
> >>> Streaming autoscaling
> >>>
> >>> I doubt whether this approach can solve my issue.
> >>>
> >>>
> >>> Thanks so much!
> >>>
> >>> Allie
> >>>
> >>>
> >>> From: Lukasz Cwik <lc...@google.com>
> >>> Date: Tue, May 14, 2019 at 11:16 AM
> >>> To: dev
> >>> Cc: user
> >>>
> >>>> Do you need to perform any joins across the files (e.g.
> Combine.perKey/GroupByKey/...)?
> >>>> If not, you could structure your pipeline
> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
> >>>> and then run it as a batch pipeline.
> >>>>
> >>>> You can set --streaming=true on the pipeline and then it will run in
> a streaming mode but streaming prioritizes low latency and correctness on
> Google Cloud Dataflow so it will cost more to run your pipeline then in
> batch mode. It may make more sense to store the data uncompressed as it may
> be less expensive then paying the additional compute cost for streaming.
> >>>>
> >>>> From: Allie Chen <yifangc...@google.com>
> >>>> Date: Tue, May 14, 2019 at 7:38 AM
> >>>> To: <dev@beam.apache.org>
> >>>> Cc: user
> >>>>
> >>>>> Is it possible to use windowing or somehow pretend it is streaming
> so Reshuffle or GroupByKey won't wait until all data has been read?
> >>>>>
> >>>>> Thanks!
> >>>>> Allie
> >>>>>
> >>>>> From: Lukasz Cwik <lc...@google.com>
> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
> >>>>> To: dev
> >>>>> Cc: user
> >>>>>
> >>>>>> There is no such flag to turn of fusion.
> >>>>>>
> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take
> time when it is limited to a small number of workers.
> >>>>>>
> >>>>>> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
> >>>>>>
> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yifangc...@google.com>
> wrote:
> >>>>>>>
> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression format
> but I will see whether the splitting gzip files will work. Is there a
> simple flag in Dataflow that could turn off the fusion?
> >>>>>>>
> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
> Reshuffle itself is not parallel either.
> >>>>>>>
> >>>>>>> Thanks all,
> >>>>>>>
> >>>>>>> Allie
> >>>>>>>
> >>>>>>> From: Reuven Lax <re...@google.com>
> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
> >>>>>>> To: dev
> >>>>>>> Cc: user
> >>>>>>>
> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely
> that simply reading and decompressing all that data was very slow when
> there was no parallelism.
> >>>>>>>>
> >>>>>>>> From: Allie Chen <yifangc...@google.com>
> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
> >>>>>>>> To: <dev@beam.apache.org>
> >>>>>>>> Cc: <u...@beam.apache.org>
> >>>>>>>>
> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
> parallel. But Reshuffle transform itself takes hours or even days to run,
> according to one test (24 gzip files, 17 million lines in total) I did.
> >>>>>>>>>
> >>>>>>>>> The file format for our users are mostly gzip format, since
> uncompressed files would be too costly to store (It could be in hundreds of
> GB).
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Allie
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
> >>>>>>>>> To: dev, <u...@beam.apache.org>
> >>>>>>>>>
> >>>>>>>>>> +u...@beam.apache.org
> >>>>>>>>>>
> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits
> till all the data has been read before the next transforms can run. After
> the reshuffle, the data should have been processed in parallel across the
> workers. Did you see this?
> >>>>>>>>>>
> >>>>>>>>>> Are you able to change the input of your pipeline to use an
> uncompressed file or many compressed files?
> >>>>>>>>>>
> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
> yifangc...@google.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow.
> Since the compressed file is not splittable, one worker is allocated to
> read the file. The same worker will do all the other transforms since
> Dataflow fused all transforms together.  There are a large amount of data
> in the file, and I expect to see more workers spinning up after reading
> transforms. I tried to use Reshuffle Transform to prevent the fusion, but
> it is not scalable since it won’t proceed until all data arrived at this
> point.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Is there any other ways to allow more workers working on all
> the other transforms after reading?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Allie
> >>>>>>>>>>>
> >>>>>>>>>>>
>

Reply via email to