koert kuipers created SPARK-11441:
-------------------------------------

             Summary: HadoopFsRelation is not scalable in number of files 
read/written
                 Key: SPARK-11441
                 URL: https://issues.apache.org/jira/browse/SPARK-11441
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.5.1
            Reporter: koert kuipers


HadoopFsRelation includes a fileStatusCache which holds information on all the 
datafiles (part files) for the data source in the driver program.

It is not unusual to be reading from 100k+ or even 1mm part files, in which 
case filling up this cache will take a very long time (days?) and require a lot 
of memory. See for example:
https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201510.mbox/%3CCAG+ckK-FvWK=1b2jqc4s+zaz2zvkqehvos9myo0ssmgpc5-...@mail.gmail.com%3E

This is not the kind of behavior you would expect of a driver program. Also 
HadoopFsRelation passes this large list of part files into:
def buildScan(inputFiles: Array[FileStatus]): RDD[Row]

Almost all implementations of HadoopFsRelation do the following inside 
buildScan:
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
This means an array of potentially millions of items now gets stored in the 
JobConf which will be broadcast. I have not found any errors about this on 
mailing list but i believe this is simply because nobody with a large number of 
inputFiles has gotten this far.

Generally when using Hadoop InputFormats there should never be a need to list 
all the part files driver side. It seems the reason it is done here is to 
facilitate a process in ParquetRelation driver side that creates a merged data 
schema. I wonder if its really necessary to look at all the part files for 
this, or if some assumption can be made that at least all the part files in a 
directory have the same schema (which would reduce the size of the problem by a 
factor 100 or so).

At the very least it seems to be that the caching of files is parquet specific 
and does not belong in HadoopFsRelation. And buildScan should just use the data 
paths (so directories if one wants to read all part files in a directory) as it 
did before SPARK-7673 / PR #6225

I ran into this issue myself with spark-avro, which also does not handle the 
input of part files in buildScan well. Spark-avro actually tries to create an 
RDD (and jobConf broadcast) per part file, which is not scalable even for 1k 
part files. Note that it is difficult for spark-avro to create an RDD per data 
directory (as it probably should) since the dataPaths have been lost now that 
the inputFiles is passed into buildScan instead. This to me again confirms the 
change in buildScan is troubling.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to