Yang Jie created SPARK-36598:
--------------------------------
Summary: The Cache construct with weight eviction has risk of
memory leakage
Key: SPARK-36598
URL: https://issues.apache.org/jira/browse/SPARK-36598
Project: Spark
Issue Type: Bug
Components: Spark Core, SQL
Affects Versions: 3.3.0
Reporter: Yang Jie
In spark, we define Guava Cache in 3 places and use the weight eviction
mechanism:
# ExternalShuffleBlockResolver#shuffleIndexCache
# RemoteBlockPushResolver#indexCache
# SharedInMemoryCache#cache
These 3 Guava Cache has risk of memory leakage if configured `maximumWeight` >=
8589934592 (8g) because LocalCache weight eviction does not work when
maxSegmentWeight is >= Int.MAX_VALUE(
[https://github.com/google/guava/issues/1761|https://github.com/google/guava/issues/1761).])
The UT that can be reproduced this issue is as follows:
{code:java}
@Test
public void testShuffleIndexCacheEvictionBehavior() throws IOException,
ExecutionException {
Map<String, String> config = new HashMap<>();
String indexCacheSize = "8192m";
config.put("spark.shuffle.service.index.cache.size", indexCacheSize);
TransportConf transportConf = new TransportConf("shuffle", new
MapConfigProvider(config));
ExternalShuffleBlockResolver resolver = new
ExternalShuffleBlockResolver(transportConf, null);
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo(SORT_MANAGER));
LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache =
resolver.shuffleIndexCache;
// 8g -> 8589934592 bytes
long maximumWeight = JavaUtils.byteStringAsBytes(indexCacheSize);
int unitSize = 1048575;
// CacheBuilder.DEFAULT_CONCURRENCY_LEVEL
int concurrencyLevel = 4;
int totalGetCount = 16384;
// maxCacheCount is 8192
long maxCacheCount = maximumWeight / concurrencyLevel / unitSize *
concurrencyLevel;
for (int i = 0; i < totalGetCount; i++) {
File indexFile = new File("shuffle_" + 0 + "_" + i + "_0.index");
ShuffleIndexInformation indexInfo =
Mockito.mock(ShuffleIndexInformation.class);
Mockito.when(indexInfo.getSize()).thenReturn(unitSize);
shuffleIndexCache.get(indexFile, () -> indexInfo);
}
long totalWeight =
shuffleIndexCache.asMap().values().stream().mapToLong(ShuffleIndexInformation::getSize).sum();
long size = shuffleIndexCache.size();
try{
Assert.assertTrue(size <= maxCacheCount);
Assert.assertTrue(totalWeight < maximumWeight);
fail("The tests code should not enter this line now.");
} catch (AssertionError error) {
// The code will enter this branch because LocalCache weight eviction does
not work
// when maxSegmentWeight is >= Int.MAX_VALUE.
Assert.assertTrue(size > maxCacheCount && size <= totalGetCount);
Assert.assertTrue(totalWeight > maximumWeight);
}
}
{code}
and from the debug view we found that there are 2 segment.totalWeight is a
negative value
!image-2021-08-26-15-51-40-532.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]