Thanks Robert. Yes, reading is the bottleneck, and we cannot do much better for gzip files, that's why we would like to at least palatalize other transforms with reading.
I tried with the side input to break the fusion you suggested earlier, and it does a much better job than using Reshuffle! One test running time if anyone is interested, without any fusion break: 6 hours with Reshuffle: never ends. cancelled after running 6 hours, about half elements processed at Reshuffle step. with side input (not using --experiment=use_fastavro yet, I will try it later): 2 hours Thanks all for your help! Allie *From: *Robert Bradshaw <rober...@google.com> *Date: *Wed, May 15, 2019 at 3:34 PM *To: *dev *Cc: *user On Wed, May 15, 2019 at 8:43 PM Allie Chen <yifangc...@google.com> wrote: > >> Thanks all for your reply. I will try each of them and see how it goes. >> >> The experiment I am working now is similar to >> https://stackoverflow.com/questions/48886943/early-results-from-groupbykey-transform, >> which tries to get early results from GroupByKey with windowing. I have >> some code like: >> >> Reading | beam.WindowInto(beam.window.GlobalWindows(), >> >> >> trigger=trigger.Repeatedly(trigger.AfterCount(1)), >> accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING) >> >> | MapWithAKey >> >> | GroupByKey >> >> | RemoveKey >> >> | OtherTransforms >> >> >> I don't see the window and trigger working, GroupByKey still waits for >> all elements. I also tried adding a timestamp for each element and using a >> fixed size window. Seems no impact. >> >> >> Anyone knows how to get the early results from GroupByKey for a bounded >> source? >> > > Note that this is essentially how Reshuffle() is implemented. However, > batch never gives early results from a GroupByKey; each stage is executed > sequentially. > > Is the goal here to be able to parallelize the Read with other operations? > If the Read (and limited-parallelism write) is still the bottleneck, that > might not help much. > >