On Fri, Sep 6, 2019 at 2:50 PM Sean Owen <sro...@gmail.com> wrote:

> I think the problem is calling globStatus to expand all 300K files.
> This is a general problem for object stores and huge numbers of files.
> Steve L. may have better thoughts on real solutions. But you might
> consider, if possible, running a lot of .csv jobs in parallel to query
> subsets of all the files, and union the results. At least there you
> parallelize the reading from the object store.
>

yeah, avoid globs and small files, especially small files in deep trees.

>
> I think it's hard to optimize this case from the Spark side as it's
> not clear how big a glob like s3://foo/* is going to be. I think it
> would take reimplementing some logic to expand the glob incrementally
> or something. Or maybe I am overlooking optimizations that have gone
> into Spark 3.
>

A long time ago I actually tried to move Filesystem.globFiles off its own
recursive treewalk into supporting the option of flat-list-chlldren +
filter. But while you can get some great speedups in some layouts, you can
get pathological collapses in perf elsewhere, which makes the people
running those queries very sad. So I gave up.

Parallelized scans can do speedup; look at the code in
org.apache.hadoop.mapred.LocatedFileStatusFetcher to see what it does
there. I've only just started exploring what we can do to tune that, with
HADOOP-16458, HADOOP-16465
<https://issues.apache.org/jira/browse/HADOOP-16465>, which should speed up
ORC/Parquet scans) . These are designed to cut 1-2 HEAD requests off per
directory list, which may seem small but from my early measurements, can be
significant.

That's why cutting things like an exists check makes a big difference,
especially if you are going to call some list() or open() operation
straight after -just call the operation and rely on the
FileNotFoundException to tell you when it's not there.

Now, looking at the code, if the list has already come from a real call to
globPath, then yes, the existsCall is wasteful, where waste = 500+ mills
per file:
http://steveloughran.blogspot.com/2016/12/how-long-does-filesystemexists-take.html

For speedup then
* have SparkHadoopUtils differentiate between files returned
by globStatus(), and which therefore exist, and those which it didn't glob
for -it will only need to check those.
* then worry about parallel execution of the scan, again

Why not file a JIRA on the spark work; send me a ref so I can look at your
patch.

One thing to know here is that not only does the S3A FS class have counters
for all operations you can get from getStorageStatistics, if you call
toString() on it it will print out the current stats. So you can just log
the fs string value before and after an operation and see what's gone on.
We track FS API calls (op_*) and actual http requests of the store
(object_*); both are interesting. object_ to see what is expensive (and in
the S3A FS code, what we should cut), the op_ values what API calls are
used a lot and should somehow be eliminated or, if you have insights,
optimised better. Removal is usually the best, as it speeds up everything.

Long term, relying on directory trees to list your source data, commit
algorithms which move/instantiate changes isn't sustainable. Things like
Apache Iceberg are where data should go ... things for which S3 can be
viewed as a fault-injecting test infrastructure. It's the Chaos Monkey of
object storage.


>
> On Fri, Sep 6, 2019 at 7:09 AM Arwin Tio <arwin....@hotmail.com> wrote:
> >
> > Hello,
> >
> > On Spark 2.4.4, I am using DataFrameReader#csv to read about 300000
> files on S3, and I've noticed that it takes about an hour for it to load
> the data on the Driver. You can see the timestamp difference when the log
> from InMemoryFileIndex occurs from 7:45 to 8:54:
> >
> > 19/09/06 07:44:42 INFO SparkContext: Running Spark version 2.4.4
> > 19/09/06 07:44:42 INFO SparkContext: Submitted application:
> LoglineParquetGenerator
> > ...
> > 19/09/06 07:45:40 INFO StateStoreCoordinatorRef: Registered
> StateStoreCoordinator endpoint
> > 19/09/06 08:54:57 INFO InMemoryFileIndex: Listing leaf files and
> directories in parallel under: [300K files...]
> >
> >
> > I believe that the issue comes from
> DataSource#checkAndGlobPathIfNecessary [0], specifically from when it is
> calling FileSystem#exists. Unlike bulkListLeafFiles, the existence check
> here happens in a single-threaded flatMap, which is a blocking network call
> if your files are stored on S3.
> >
> > I believe that there is a fairly straightforward opportunity for
> improvement here, which is to parallelize the existence check perhaps with
> a configurable number of threads. If that seems reasonable, I would like to
> create a JIRA ticket and submit a patch. Please let me know!
> >
> > Cheers,
> >
> > Arwin
> >
> > [0]
> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L557
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to