[ 
https://issues.apache.org/jira/browse/SPARK-11441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15002751#comment-15002751
 ] 

koert kuipers commented on SPARK-11441:
---------------------------------------

going over the code base it seems that there are 2 features that spark-sql 
intends to support that cause this issue:
1) partition discovery
partition discovery is done driver side, and requires full traversal of all 
subdirectories for paths provided. since hadoop FileSystem does not support 
listing just directories this basically means all files and directories need to 
be listed.
2) schema merging
in various places (parquet, avro) the assumption is made that all part files 
can have different schemas, even if the part files sit in the same data 
directory.
now it is highly unusual to have part files with different schemas within the 
same directory due to the bulk write-once (immutable) nature of data on hadoop 
and spark, but with SaveMode.Append we did open the door to this (to me 
somewhat disturbing) possibility.
once you allow every part file to have its own schema, you need to discover all 
these schemas driver side, which is why parquet does an operation over all part 
files to merge schemas (in parallel).

Given the design choice of HadoopFsRelation i think 1) is pretty much 
unavoidable, but can be made more efficient by doing tricks like a depth-first 
traversal to quickly find subdirs depth, and then use a single glob operation 
to find all the partitions.

To me 2) is basically a bad idea, but opinions can differ on this, and parquet 
also has the ability to turn this schema merging off. The main issue currently 
is that this schema merging option causes some side effects in HadoopFsRelation 
that break scalability. I would argue that support for 2) should be part of 
ParquetRelation without impacting HadoopFsRelation negatively. What is the 
negative impact? 2 things:
a) the FileStatusCache in HadoopFsRelations with inside the leafFiles object 
which does not scale and is not necessary if you dont do schema merging
b) the change in the buildScan api to take in inputFiles: Array[FileStatus] 
(which must list all part files) instead of just the paths: Array[String] 
(which list the input paths, which can be directories).

a) is a problem because it can slown down and blow out the driver program 
easily when reading any decent size dataset
b) is a problem because it makes it impossible to know what the original paths 
were in buildScan. It is important to know the paths because a reasonable 
assumption is that the schema is consistent per path, so schemas can be 
efficiently discovered exploiting this, or alternatively a DataFrame can be 
created per path and then they get unioned to deal with varying schemas. 
currently none of this is possible anymore, unless you want to look at all the 
part files for schema discovery (probably not scalable) or create a DataFrame 
per part files (totally not scalable).

> HadoopFsRelation is not scalable in number of files read/written
> ----------------------------------------------------------------
>
>                 Key: SPARK-11441
>                 URL: https://issues.apache.org/jira/browse/SPARK-11441
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.1
>            Reporter: koert kuipers
>
> HadoopFsRelation includes a fileStatusCache which holds information on all 
> the datafiles (part files) for the data source in the driver program.
> It is not unusual to be reading from 100k+ or even 1mm part files, in which 
> case filling up this cache will take a very long time (days?) and require a 
> lot of memory. See for example:
> https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201510.mbox/%3CCAG+ckK-FvWK=1b2jqc4s+zaz2zvkqehvos9myo0ssmgpc5-...@mail.gmail.com%3E
> This is not the kind of behavior you would expect of a driver program. Also 
> HadoopFsRelation passes this large list of part files into:
> def buildScan(inputFiles: Array[FileStatus]): RDD[Row]
> Almost all implementations of HadoopFsRelation do the following inside 
> buildScan:
> FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
> This means an array of potentially millions of items now gets stored in the 
> JobConf which will be broadcast. I have not found any errors about this on 
> mailing list but i believe this is simply because nobody with a large number 
> of inputFiles has gotten this far.
> Generally when using Hadoop InputFormats there should never be a need to list 
> all the part files driver side. It seems the reason it is done here is to 
> facilitate a process in ParquetRelation driver side that creates a merged 
> data schema. I wonder if its really necessary to look at all the part files 
> for this, or if some assumption can be made that at least all the part files 
> in a directory have the same schema (which would reduce the size of the 
> problem by a factor 100 or so).
> At the very least it seems to be that the caching of files is parquet 
> specific and does not belong in HadoopFsRelation. And buildScan should just 
> use the data paths (so directories if one wants to read all part files in a 
> directory) as it did before SPARK-7673 / PR #6225
> I ran into this issue myself with spark-avro, which also does not handle the 
> input of part files in buildScan well. Spark-avro actually tries to create an 
> RDD (and jobConf broadcast) per part file, which is not scalable even for 1k 
> part files. Note that it is difficult for spark-avro to create an RDD per 
> data directory (as it probably should) since the dataPaths have been lost now 
> that the inputFiles is passed into buildScan instead. This to me again 
> confirms the change in buildScan is troubling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to