[
https://issues.apache.org/jira/browse/BEAM-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17050688#comment-17050688
]
Emiliano Capoccia edited comment on BEAM-9434 at 3/5/20, 9:41 PM:
------------------------------------------------------------------
In the case outlined of a large number of very small (kb) avro files, the idea
is to expose a new hint in the AvroIO class that can handle the reading of the
input files with a pre determined number of parallel tasks.
Both extremes of having a very high or a very low number of tasks should be
avoided, as they are suboptimal in terms of performance: too many tasks yield
to very high overhead whereas too few (or a single one) result in an
unacceptable serialisation on few nodes, with the cluster being under utilised.
In my tests I read 6578 Avro files from S3, each containing a single record.
The performance of the reading the files using the proposed pull request #11037
improved from 16 minutes to 2.3 minutes with 10 partitions.
Even more importantly, the memory used by every node is 1/10th roughly of the
case with a single node.
*Reference run*, 6578 files, 1 task/executor, shuffle read 164kb, 6578 records,
shuffle write 58Mb, 16 minutes execution time.
*PR #11037*, 10 tasks/executors, 660 files per task average, totalling 6578;
23kb average shuffle read per task, 6 Mb average shuffle write per task, 2.3
minutes execution time per executor in parallel.
was (Author: ecapoccia):
In the case outlined of a large number of very small (kb) avro files, the idea
is to expose a new hint in the AvroIO class that can handle the reading of the
input files with a pre determined number of parallel tasks.
Both extremes of having a very high or a very low number of tasks should be
avoided, as they are suboptimal in terms of performance: too many tasks yield
to very high overhead whereas a too few tasks (or a single one) result in an
unacceptable serialisation of reading on too little node, with the cluster
being under utilised.
In the tests that I carried out, I was reading 6578 Avro files from S3, each
containing a single record.
The performance of the reading using the proposed pull request #11037 improved
using 10 partitions, from 16 minutes to 2.3 minutes for performing the same
exact work.
Even more importantly, the memory used by every node is 1/10th roughly of the
case with a single node.
*Reference run*, 6578 files, 1 task/executor, shuffle read 164kb, 6578 records,
shuffle write 58Mb, 16 minutes execution time.
*PR #11037*, 10 tasks/executors, 660 files per task average, totalling 6578;
23kb average shuffle read per task, 6 Mb average shuffle write per task, 2.3
minutes execution time per executor in parallel.
> Performance improvements processing a large number of Avro files in S3+Spark
> ----------------------------------------------------------------------------
>
> Key: BEAM-9434
> URL: https://issues.apache.org/jira/browse/BEAM-9434
> Project: Beam
> Issue Type: Improvement
> Components: io-java-aws, sdk-java-core
> Affects Versions: 2.19.0
> Reporter: Emiliano Capoccia
> Assignee: Emiliano Capoccia
> Priority: Minor
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> There is a performance issue when processing a large number of small Avro
> files in Spark on K8S (tens of thousands or more).
> The recommended way of reading a pattern of Avro files in Beam is by means of:
>
> {code:java}
> PCollection<AvroGenClass> records = p.apply(AvroIO.read(AvroGenClass.class)
> .from("s3://my-bucket/path-to/*.avro").withHintMatchesManyFiles())
> {code}
> However, in the case of many small files, the above results in the entire
> reading taking place in a single task/node, which is considerably slow and
> has scalability issues.
> The option of omitting the hint is not viable, as it results in too many
> tasks being spawn, and the cluster being busy doing coordination of tiny
> tasks with high overhead.
> There are a few workarounds on the internet which mainly revolve around
> compacting the input files before processing, so that a reduced number of
> bulky files is processed in parallel.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)