Thanks for linking this discussion with BEAM-5036 (and transitively to
BEAM-4861 which also comes in to play) Jozek.

What Reuven speculated and Jozek had previously observed is indeed the
major cause. Today I've been testing the effect of a "move" using rename()
instead of a copy() and delete().

My test environment is different today but still using 1.5TB input data and
the code I linked earlier in GH [1]:

  - Spark API: 35 minutes
  - Beam AvroIO (2.6.0): 1.7hrs
  - Beam AvroIO with rename() patch: 42 minutes

On the DAG linked in the GH repo [1] stages 3&4 are reduced to seconds
saving 53 minutes from Beam 2.6.0 version which is the predominant gain
here.

Unless new comments come in I propose fixing BEAM-5036 and BEAM-4861 and
continuing discussion on those Jiras.
This requires a bit of exploration and decision around the expectations of
e.g. the target directory not existing and also correcting the incorrect
use of the HDFS API (it ignores the return value which can indicate error
on e.g. directory not existing today).

Thank you all for contributing to this discussion.

[1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro



On Thu, Aug 23, 2018 at 11:55 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> Just for reference, there is a JIRA open for
> FileBasedSink.moveToOutputFiles()  and filesystem move behavior
>
> https://issues.apache.org/jira/browse/BEAM-5036
>
>
> On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson <timrobertson...@gmail.com>
> wrote:
>
>> Reuven, I think you might be on to something
>>
>> The Beam HadoopFileSystem copy() does indeed stream through the driver
>> [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method
>> [2].
>> I'll cobble together a patched version to test using a rename() rather
>> than a copy() and report back findings before we consider the implications.
>>
>> Thanks
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288
>>
>> On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson <timrobertson...@gmail.com>
>> wrote:
>>
>>> > Does HDFS support a fast rename operation?
>>>
>>> Yes. From the shell it is “mv” and in the Java API it is “rename(Path
>>> src, Path dst)”.
>>> I am not aware of a fast copy though. I think an HDFS copy streams the
>>> bytes through the driver (unless a distcp is issued which is a MR job).
>>>
>>> (Thanks for engaging in this discussion folks)
>>>
>>>
>>> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
>>>> temporary files to the final destination and then delete the temp files.
>>>> Does HDFS support a fast rename operation? If so, I bet Spark is using that
>>>> instead of paying the cost of copying the files.
>>>>
>>>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Ismael, that should already be true. If not using dynamic destinations
>>>>> there might be some edges in the graph that are never used (i.e. no 
>>>>> records
>>>>> are ever published on them), but that should not affect performance. If
>>>>> this is not the case we should fix it.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía <ieme...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Spark runner uses the Spark broadcast mechanism to materialize the
>>>>>> side input PCollections in the workers, not sure exactly if this is
>>>>>> efficient assigned in an optimal way but seems logical at least.
>>>>>>
>>>>>> Just wondering if we shouldn't better first tackle the fact that if
>>>>>> the pipeline does not have dynamic destinations (this case) WriteFiles
>>>>>> should not be doing so much extra magic?
>>>>>>
>>>>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax <re...@google.com> wrote:
>>>>>> >
>>>>>> > Often only the metadata (i.e. temp file names) are shuffled, except
>>>>>> in the "spilling" case (which should only happen when using dynamic
>>>>>> destinations).
>>>>>> >
>>>>>> > WriteFiles depends heavily on side inputs. How are side inputs
>>>>>> implemented in the Spark runner?
>>>>>> >
>>>>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw <
>>>>>> rober...@google.com> wrote:
>>>>>> >>
>>>>>> >> Yes, I stand corrected, dynamic writes is now much more than the
>>>>>> >> primitive window-based naming we used to have.
>>>>>> >>
>>>>>> >> It would be interesting to visualize how much of this codepath is
>>>>>> >> metatada vs. the actual data.
>>>>>> >>
>>>>>> >> In the case of file writing, it seems one could (maybe?) avoid
>>>>>> >> requiring a stable input, as shards are accepted as a whole
>>>>>> (unlike,
>>>>>> >> say, sinks where a deterministic uid is needed for deduplication on
>>>>>> >> retry).
>>>>>> >>
>>>>>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax <re...@google.com>
>>>>>> wrote:
>>>>>> >> >
>>>>>> >> > Robert - much of the complexity isn't due to streaming, but
>>>>>> rather because WriteFiles supports "dynamic" output (where the user can
>>>>>> choose a destination file based on the input record). In practice if a
>>>>>> pipeline is not using dynamic destinations the full graph is still
>>>>>> generated, but much of that graph is never used (empty PCollections).
>>>>>> >> >
>>>>>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
>>>>>> rober...@google.com> wrote:
>>>>>> >> >>
>>>>>> >> >> I agree that this is concerning. Some of the complexity may
>>>>>> have also
>>>>>> >> >> been introduced to accommodate writing files in Streaming mode,
>>>>>> but it
>>>>>> >> >> seems we should be able to execute this as a single Map
>>>>>> operation.
>>>>>> >> >>
>>>>>> >> >> Have you profiled to see which stages and/or operations are
>>>>>> taking up the time?
>>>>>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>>>>> >> >> <timrobertson...@gmail.com> wrote:
>>>>>> >> >> >
>>>>>> >> >> > Hi folks,
>>>>>> >> >> >
>>>>>> >> >> > I've recently been involved in projects rewriting Avro files
>>>>>> and have discovered a concerning performance trait in Beam.
>>>>>> >> >> >
>>>>>> >> >> > I have observed Beam between 6-20x slower than native Spark
>>>>>> or MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>>>>>> >> >> >
>>>>>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>>>>>> Beam/Spark, 40 minutes with a map-only MR job
>>>>>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>>>>>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>>>>>> >> >> >
>>>>>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x
>>>>>> clusters (Spark / YARN) on reference Dell / Cloudera hardware.
>>>>>> >> >> >
>>>>>> >> >> > I have only just started exploring but I believe the cause is
>>>>>> rooted in the WriteFiles which is used by all our file based IO. 
>>>>>> WriteFiles
>>>>>> is reasonably complex with reshuffles, spilling to temporary files
>>>>>> (presumably to accommodate varying bundle sizes/avoid small files), a
>>>>>> union, a GBK etc.
>>>>>> >> >> >
>>>>>> >> >> > Before I go too far with exploration I'd appreciate thoughts
>>>>>> on whether we believe this is a concern (I do), if we should explore
>>>>>> optimisations or any insight from previous work in this area.
>>>>>> >> >> >
>>>>>> >> >> > Thanks,
>>>>>> >> >> > Tim
>>>>>> >> >> >
>>>>>> >> >> > [1]
>>>>>> https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>>>>>
>>>>>

Reply via email to