[
https://issues.apache.org/jira/browse/SPARK-36598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-36598:
------------------------------------
Assignee: (was: Apache Spark)
> The Cache use weight eviction mechanism has risk of memory leak
> ---------------------------------------------------------------
>
> Key: SPARK-36598
> URL: https://issues.apache.org/jira/browse/SPARK-36598
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, SQL
> Affects Versions: 3.3.0
> Reporter: Yang Jie
> Priority: Minor
> Attachments: image-2021-08-26-15-56-09-028.png
>
>
> In spark, we define Guava Cache in 3 places and use the weight eviction
> mechanism:
> # ExternalShuffleBlockResolver#shuffleIndexCache
> # RemoteBlockPushResolver#indexCache
> # SharedInMemoryCache#cache
> These 3 Guava Cache has risk of memory leakage if configured `maximumWeight`
> >= 8589934592 (8g) because LocalCache weight eviction does not work when
> maxSegmentWeight is >= Int.MAX_VALUE(
> [https://github.com/google/guava/issues/1761|https://github.com/google/guava/issues/1761).])
> The UT that can be reproduced this issue is as follows:
> {code:java}
> @Test
> public void testShuffleIndexCacheEvictionBehavior() throws IOException,
> ExecutionException {
> Map<String, String> config = new HashMap<>();
> String indexCacheSize = "8192m";
> config.put("spark.shuffle.service.index.cache.size", indexCacheSize);
> TransportConf transportConf = new TransportConf("shuffle", new
> MapConfigProvider(config));
> ExternalShuffleBlockResolver resolver = new
> ExternalShuffleBlockResolver(transportConf, null);
> resolver.registerExecutor("app0", "exec0",
> dataContext.createExecutorInfo(SORT_MANAGER));
> LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache =
> resolver.shuffleIndexCache; // need change access scope of
> ExternalShuffleBlockResolver.shuffleIndexCache
> // 8g -> 8589934592 bytes
> long maximumWeight = JavaUtils.byteStringAsBytes(indexCacheSize);
> int unitSize = 1048575;
> // CacheBuilder.DEFAULT_CONCURRENCY_LEVEL
> int concurrencyLevel = 4;
> int totalGetCount = 16384;
> // maxCacheCount is 8192
> long maxCacheCount = maximumWeight / concurrencyLevel / unitSize *
> concurrencyLevel;
> for (int i = 0; i < totalGetCount; i++) {
> File indexFile = new File("shuffle_" + 0 + "_" + i + "_0.index");
> ShuffleIndexInformation indexInfo =
> Mockito.mock(ShuffleIndexInformation.class);
> Mockito.when(indexInfo.getSize()).thenReturn(unitSize);
> shuffleIndexCache.get(indexFile, () -> indexInfo);
> }
> long totalWeight =
>
> shuffleIndexCache.asMap().values().stream().mapToLong(ShuffleIndexInformation::getSize).sum();
> long size = shuffleIndexCache.size();
> try{
> Assert.assertTrue(size <= maxCacheCount);
> Assert.assertTrue(totalWeight < maximumWeight);
> fail("The tests code should not enter this line now.");
> } catch (AssertionError error) {
> // The code will enter this branch because LocalCache weight eviction
> does not work
> // when maxSegmentWeight is >= Int.MAX_VALUE.
> Assert.assertTrue(size > maxCacheCount && size <= totalGetCount);
> Assert.assertTrue(totalWeight > maximumWeight);
> }
> }
> {code}
>
> and from the debug view we found that there are 2 segment.totalWeight is a
> negative value
>
> !image-2021-08-26-15-56-09-028.png!
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]