[ 
https://issues.apache.org/jira/browse/SPARK-27664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27664:
------------------------------------

    Assignee: Apache Spark

> Performance issue with FileStatusCache, while reading from object stores.
> -------------------------------------------------------------------------
>
>                 Key: SPARK-27664
>                 URL: https://issues.apache.org/jira/browse/SPARK-27664
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.0, 2.4.3
>            Reporter: Prashant Sharma
>            Assignee: Apache Spark
>            Priority: Major
>
> In short,
> This issue(i.e. degraded performance ) surfaces when the number of files are 
> large > 100K, and is stored on an object store, or any remote storage. The 
> actual issue is due to,
> Everything is inserted as a single entry in the FileStatusCache i.e. guava 
> cache, which does not fit unless the cache is configured to be very very 
> large or 4X. Reason: [https://github.com/google/guava/issues/3462].
>  
> Full story, with possible solutions,
> When we read a directory in spark by,
> {code:java}
> spark.read.parquet("/dir/data/test").limit(1).show()
> {code}
> behind the scenes, it fetches the FileStatus objects and caches them, inside 
> a FileStatusCache, so that it does not need to refetch these objects. 
> Internally, it scans using listLeafFiles function at driver. 
>  Inside the cache, the entire content of the listing as array of FileStatus 
> objects is inserted as a single entry, with key as "/dir/data/test", in the 
> FileStatusCache. The default size of this cache is 250MB and it is 
> configurable. This underlying cache uses guava cache.
> The guava cache has one interesting property, i.e. a single entry can only be 
> as large as
> {code:java}
> maximumSize/concurrencyLevel{code}
> see [https://github.com/google/guava/issues/3462], for details. So for a 
> cache size of 250MB, a single entry can be as large as only 250MB/4, since 
> the default concurrency level is 4 in guava. This size is around 62MB, which 
> is good enough for most datasets, but for directories with larger listing, it 
> does not work that well. And the effect of this is especially evident when 
> such listings are for object stores like Amazon s3 or IBM Cloud object store 
> etc..
> So, currently one can work around this problem by setting the value of size 
> of the cache (i.e. `spark.sql.hive.filesourcePartitionFileCacheSize`) as very 
> high, as it needs to be much more than 4x of what is required. But this has a 
> drawback, that either one has to start the driver with large amount of memory 
> than required or risk an OOM when cache does not evict older entries as the 
> size is configured to be 4x.
> In order to fix this issue, we can take 3 different approaches,
> 1) one stop gap fix can be, reduce the concurrency level of the guava cache 
> to be just 1, for few entries with very large size, we do not lose much by 
> doing this.
> 2) The alternative would be, to divide the input array into multiple entries 
> in the cache, instead of inserting everything against a single key. This can 
> be done using directories as keys, if there are multiple nested directories 
> under a directory, but if a user has everything listed under a single dir, 
> then this solution does not help either and we cannot depend on them creating 
> partitions in their hive/sql table.
> 3) One more alternative fix would be, to make concurrency level configurable, 
> for those who want to change it. And while inserting the entry in the cache 
> divide it into the `concurrencyLevel`(or even 2X or 3X of it) number of 
> parts, before inserting. This way cache will perform optimally, and even if 
> there is an eviction, it will evict only a part of the entries, as against 
> all the entries in the current implementation. How many entries are evicted 
> due to size, depends on concurrencyLevel configured. This approach can be 
> taken, even without making `concurrencyLevel` configurable.
> The problem with this approach is, the partitions in cache are of no use as 
> such, because even if one partition is evicted, then all the partitions of 
> the key should also be evicted, otherwise the results would be wrong. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to