Reuven, I think you might be on to something The Beam HadoopFileSystem copy() does indeed stream through the driver [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method [2]. I'll cobble together a patched version to test using a rename() rather than a copy() and report back findings before we consider the implications.
Thanks [1] https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124 [2] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288 On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson <timrobertson...@gmail.com> wrote: > > Does HDFS support a fast rename operation? > > Yes. From the shell it is “mv” and in the Java API it is “rename(Path src, > Path dst)”. > I am not aware of a fast copy though. I think an HDFS copy streams the > bytes through the driver (unless a distcp is issued which is a MR job). > > (Thanks for engaging in this discussion folks) > > > On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax <re...@google.com> wrote: > >> I have another theory: in FileBasedSink.moveToOutputFiles we copy the >> temporary files to the final destination and then delete the temp files. >> Does HDFS support a fast rename operation? If so, I bet Spark is using that >> instead of paying the cost of copying the files. >> >> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote: >> >>> Ismael, that should already be true. If not using dynamic destinations >>> there might be some edges in the graph that are never used (i.e. no records >>> are ever published on them), but that should not affect performance. If >>> this is not the case we should fix it. >>> >>> Reuven >>> >>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ieme...@gmail.com> wrote: >>> >>>> Spark runner uses the Spark broadcast mechanism to materialize the >>>> side input PCollections in the workers, not sure exactly if this is >>>> efficient assigned in an optimal way but seems logical at least. >>>> >>>> Just wondering if we shouldn't better first tackle the fact that if >>>> the pipeline does not have dynamic destinations (this case) WriteFiles >>>> should not be doing so much extra magic? >>>> >>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote: >>>> > >>>> > Often only the metadata (i.e. temp file names) are shuffled, except >>>> in the "spilling" case (which should only happen when using dynamic >>>> destinations). >>>> > >>>> > WriteFiles depends heavily on side inputs. How are side inputs >>>> implemented in the Spark runner? >>>> > >>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >> >>>> >> Yes, I stand corrected, dynamic writes is now much more than the >>>> >> primitive window-based naming we used to have. >>>> >> >>>> >> It would be interesting to visualize how much of this codepath is >>>> >> metatada vs. the actual data. >>>> >> >>>> >> In the case of file writing, it seems one could (maybe?) avoid >>>> >> requiring a stable input, as shards are accepted as a whole (unlike, >>>> >> say, sinks where a deterministic uid is needed for deduplication on >>>> >> retry). >>>> >> >>>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com> wrote: >>>> >> > >>>> >> > Robert - much of the complexity isn't due to streaming, but rather >>>> because WriteFiles supports "dynamic" output (where the user can choose a >>>> destination file based on the input record). In practice if a pipeline is >>>> not using dynamic destinations the full graph is still generated, but much >>>> of that graph is never used (empty PCollections). >>>> >> > >>>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw < >>>> rober...@google.com> wrote: >>>> >> >> >>>> >> >> I agree that this is concerning. Some of the complexity may have >>>> also >>>> >> >> been introduced to accommodate writing files in Streaming mode, >>>> but it >>>> >> >> seems we should be able to execute this as a single Map operation. >>>> >> >> >>>> >> >> Have you profiled to see which stages and/or operations are >>>> taking up the time? >>>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson >>>> >> >> <timrobertson...@gmail.com> wrote: >>>> >> >> > >>>> >> >> > Hi folks, >>>> >> >> > >>>> >> >> > I've recently been involved in projects rewriting Avro files >>>> and have discovered a concerning performance trait in Beam. >>>> >> >> > >>>> >> >> > I have observed Beam between 6-20x slower than native Spark or >>>> MapReduce code for a simple pipeline of read Avro, modify, write Avro. >>>> >> >> > >>>> >> >> > - Rewriting 200TB of Avro files (big cluster): 14 hrs using >>>> Beam/Spark, 40 minutes with a map-only MR job >>>> >> >> > - Rewriting 1.5TB Avro file (small cluster): 2 hrs using >>>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1] >>>> >> >> > >>>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters >>>> (Spark / YARN) on reference Dell / Cloudera hardware. >>>> >> >> > >>>> >> >> > I have only just started exploring but I believe the cause is >>>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles >>>> is reasonably complex with reshuffles, spilling to temporary files >>>> (presumably to accommodate varying bundle sizes/avoid small files), a >>>> union, a GBK etc. >>>> >> >> > >>>> >> >> > Before I go too far with exploration I'd appreciate thoughts on >>>> whether we believe this is a concern (I do), if we should explore >>>> optimisations or any insight from previous work in this area. >>>> >> >> > >>>> >> >> > Thanks, >>>> >> >> > Tim >>>> >> >> > >>>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro >>>> >>>