[ https://issues.apache.org/jira/browse/SPARK-27664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-27664: ------------------------------------ Assignee: Apache Spark > Performance issue with FileStatusCache, while reading from object stores. > ------------------------------------------------------------------------- > > Key: SPARK-27664 > URL: https://issues.apache.org/jira/browse/SPARK-27664 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.0.0, 2.4.3 > Reporter: Prashant Sharma > Assignee: Apache Spark > Priority: Major > > In short, > This issue(i.e. degraded performance ) surfaces when the number of files are > large > 100K, and is stored on an object store, or any remote storage. The > actual issue is due to, > Everything is inserted as a single entry in the FileStatusCache i.e. guava > cache, which does not fit unless the cache is configured to be very very > large or 4X. Reason: [https://github.com/google/guava/issues/3462]. > > Full story, with possible solutions, > When we read a directory in spark by, > {code:java} > spark.read.parquet("/dir/data/test").limit(1).show() > {code} > behind the scenes, it fetches the FileStatus objects and caches them, inside > a FileStatusCache, so that it does not need to refetch these objects. > Internally, it scans using listLeafFiles function at driver. > Inside the cache, the entire content of the listing as array of FileStatus > objects is inserted as a single entry, with key as "/dir/data/test", in the > FileStatusCache. The default size of this cache is 250MB and it is > configurable. This underlying cache uses guava cache. > The guava cache has one interesting property, i.e. a single entry can only be > as large as > {code:java} > maximumSize/concurrencyLevel{code} > see [https://github.com/google/guava/issues/3462], for details. So for a > cache size of 250MB, a single entry can be as large as only 250MB/4, since > the default concurrency level is 4 in guava. This size is around 62MB, which > is good enough for most datasets, but for directories with larger listing, it > does not work that well. And the effect of this is especially evident when > such listings are for object stores like Amazon s3 or IBM Cloud object store > etc.. > So, currently one can work around this problem by setting the value of size > of the cache (i.e. `spark.sql.hive.filesourcePartitionFileCacheSize`) as very > high, as it needs to be much more than 4x of what is required. But this has a > drawback, that either one has to start the driver with large amount of memory > than required or risk an OOM when cache does not evict older entries as the > size is configured to be 4x. > In order to fix this issue, we can take 3 different approaches, > 1) one stop gap fix can be, reduce the concurrency level of the guava cache > to be just 1, for few entries with very large size, we do not lose much by > doing this. > 2) The alternative would be, to divide the input array into multiple entries > in the cache, instead of inserting everything against a single key. This can > be done using directories as keys, if there are multiple nested directories > under a directory, but if a user has everything listed under a single dir, > then this solution does not help either and we cannot depend on them creating > partitions in their hive/sql table. > 3) One more alternative fix would be, to make concurrency level configurable, > for those who want to change it. And while inserting the entry in the cache > divide it into the `concurrencyLevel`(or even 2X or 3X of it) number of > parts, before inserting. This way cache will perform optimally, and even if > there is an eviction, it will evict only a part of the entries, as against > all the entries in the current implementation. How many entries are evicted > due to size, depends on concurrencyLevel configured. This approach can be > taken, even without making `concurrencyLevel` configurable. > The problem with this approach is, the partitions in cache are of no use as > such, because even if one partition is evicted, then all the partitions of > the key should also be evicted, otherwise the results would be wrong. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org