[ 
https://issues.apache.org/jira/browse/SPARK-36598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Jie updated SPARK-36598:
-----------------------------
    Description: 
In spark, we define Guava Cache in 3 places and use the weight eviction 
mechanism:
 # ExternalShuffleBlockResolver#shuffleIndexCache
 # RemoteBlockPushResolver#indexCache
 # SharedInMemoryCache#cache

These 3 Guava Cache has risk of memory leakage if configured `maximumWeight` >= 
8589934592 (8g) because LocalCache weight eviction does not work when 
maxSegmentWeight is >= Int.MAX_VALUE( 
[https://github.com/google/guava/issues/1761|https://github.com/google/guava/issues/1761).])

The UT that can be reproduced this issue is as follows:
{code:java}
@Test
public void testShuffleIndexCacheEvictionBehavior() throws IOException, 
ExecutionException {
  Map<String, String> config = new HashMap<>();
  String indexCacheSize = "8192m";
  config.put("spark.shuffle.service.index.cache.size", indexCacheSize);
  TransportConf transportConf = new TransportConf("shuffle", new 
MapConfigProvider(config));
  ExternalShuffleBlockResolver resolver = new 
ExternalShuffleBlockResolver(transportConf, null);
  resolver.registerExecutor("app0", "exec0", 
dataContext.createExecutorInfo(SORT_MANAGER));

  LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache = 
resolver.shuffleIndexCache; // need change access scope of 
ExternalShuffleBlockResolver.shuffleIndexCache

  // 8g -> 8589934592 bytes
  long maximumWeight = JavaUtils.byteStringAsBytes(indexCacheSize);
  int unitSize = 1048575;
  // CacheBuilder.DEFAULT_CONCURRENCY_LEVEL
  int concurrencyLevel = 4;
  int totalGetCount = 16384;
  // maxCacheCount is 8192
  long maxCacheCount = maximumWeight / concurrencyLevel / unitSize * 
concurrencyLevel;
  for (int i = 0; i < totalGetCount; i++) {
    File indexFile = new File("shuffle_" + 0 + "_" + i + "_0.index");
    ShuffleIndexInformation indexInfo = 
Mockito.mock(ShuffleIndexInformation.class);
    Mockito.when(indexInfo.getSize()).thenReturn(unitSize);
    shuffleIndexCache.get(indexFile, () -> indexInfo);
  }

  long totalWeight =
    
shuffleIndexCache.asMap().values().stream().mapToLong(ShuffleIndexInformation::getSize).sum();
  long size = shuffleIndexCache.size();
  try{
    Assert.assertTrue(size <= maxCacheCount);
    Assert.assertTrue(totalWeight < maximumWeight);
    fail("The tests code should not enter this line now.");
  } catch (AssertionError error) {
    // The code will enter this branch because LocalCache weight eviction does 
not work
    // when maxSegmentWeight is >= Int.MAX_VALUE.
    Assert.assertTrue(size > maxCacheCount && size <= totalGetCount);
    Assert.assertTrue(totalWeight > maximumWeight);
  }
}
{code}
 

and from the debug view  we found that there are 2 segment.totalWeight is a 
negative value

 

!image-2021-08-26-15-56-09-028.png!  

 

 

  was:
In spark, we define Guava Cache in 3 places and use the weight eviction 
mechanism:
 # ExternalShuffleBlockResolver#shuffleIndexCache
 # RemoteBlockPushResolver#indexCache
 # SharedInMemoryCache#cache

These 3 Guava Cache has risk of memory leakage if configured `maximumWeight` >= 
8589934592 (8g) because LocalCache weight eviction does not work when 
maxSegmentWeight is >= Int.MAX_VALUE( 
[https://github.com/google/guava/issues/1761|https://github.com/google/guava/issues/1761).])

The UT that can be reproduced this issue is as follows:
{code:java}
@Test
public void testShuffleIndexCacheEvictionBehavior() throws IOException, 
ExecutionException {
  Map<String, String> config = new HashMap<>();
  String indexCacheSize = "8192m";
  config.put("spark.shuffle.service.index.cache.size", indexCacheSize);
  TransportConf transportConf = new TransportConf("shuffle", new 
MapConfigProvider(config));
  ExternalShuffleBlockResolver resolver = new 
ExternalShuffleBlockResolver(transportConf, null);
  resolver.registerExecutor("app0", "exec0", 
dataContext.createExecutorInfo(SORT_MANAGER));

  LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache = 
resolver.shuffleIndexCache;

  // 8g -> 8589934592 bytes
  long maximumWeight = JavaUtils.byteStringAsBytes(indexCacheSize);
  int unitSize = 1048575;
  // CacheBuilder.DEFAULT_CONCURRENCY_LEVEL
  int concurrencyLevel = 4;
  int totalGetCount = 16384;
  // maxCacheCount is 8192
  long maxCacheCount = maximumWeight / concurrencyLevel / unitSize * 
concurrencyLevel;
  for (int i = 0; i < totalGetCount; i++) {
    File indexFile = new File("shuffle_" + 0 + "_" + i + "_0.index");
    ShuffleIndexInformation indexInfo = 
Mockito.mock(ShuffleIndexInformation.class);
    Mockito.when(indexInfo.getSize()).thenReturn(unitSize);
    shuffleIndexCache.get(indexFile, () -> indexInfo);
  }

  long totalWeight =
    
shuffleIndexCache.asMap().values().stream().mapToLong(ShuffleIndexInformation::getSize).sum();
  long size = shuffleIndexCache.size();
  try{
    Assert.assertTrue(size <= maxCacheCount);
    Assert.assertTrue(totalWeight < maximumWeight);
    fail("The tests code should not enter this line now.");
  } catch (AssertionError error) {
    // The code will enter this branch because LocalCache weight eviction does 
not work
    // when maxSegmentWeight is >= Int.MAX_VALUE.
    Assert.assertTrue(size > maxCacheCount && size <= totalGetCount);
    Assert.assertTrue(totalWeight > maximumWeight);
  }
}
{code}
 

and from the debug view  we found that there are 2 segment.totalWeight is a 
negative value

 

!image-2021-08-26-15-56-09-028.png!  

 

 


> The Cache use weight eviction mechanism has risk of memory leakage
> ------------------------------------------------------------------
>
>                 Key: SPARK-36598
>                 URL: https://issues.apache.org/jira/browse/SPARK-36598
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 3.3.0
>            Reporter: Yang Jie
>            Priority: Minor
>         Attachments: image-2021-08-26-15-56-09-028.png
>
>
> In spark, we define Guava Cache in 3 places and use the weight eviction 
> mechanism:
>  # ExternalShuffleBlockResolver#shuffleIndexCache
>  # RemoteBlockPushResolver#indexCache
>  # SharedInMemoryCache#cache
> These 3 Guava Cache has risk of memory leakage if configured `maximumWeight` 
> >= 8589934592 (8g) because LocalCache weight eviction does not work when 
> maxSegmentWeight is >= Int.MAX_VALUE( 
> [https://github.com/google/guava/issues/1761|https://github.com/google/guava/issues/1761).])
> The UT that can be reproduced this issue is as follows:
> {code:java}
> @Test
> public void testShuffleIndexCacheEvictionBehavior() throws IOException, 
> ExecutionException {
>   Map<String, String> config = new HashMap<>();
>   String indexCacheSize = "8192m";
>   config.put("spark.shuffle.service.index.cache.size", indexCacheSize);
>   TransportConf transportConf = new TransportConf("shuffle", new 
> MapConfigProvider(config));
>   ExternalShuffleBlockResolver resolver = new 
> ExternalShuffleBlockResolver(transportConf, null);
>   resolver.registerExecutor("app0", "exec0", 
> dataContext.createExecutorInfo(SORT_MANAGER));
>   LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache = 
> resolver.shuffleIndexCache; // need change access scope of 
> ExternalShuffleBlockResolver.shuffleIndexCache
>   // 8g -> 8589934592 bytes
>   long maximumWeight = JavaUtils.byteStringAsBytes(indexCacheSize);
>   int unitSize = 1048575;
>   // CacheBuilder.DEFAULT_CONCURRENCY_LEVEL
>   int concurrencyLevel = 4;
>   int totalGetCount = 16384;
>   // maxCacheCount is 8192
>   long maxCacheCount = maximumWeight / concurrencyLevel / unitSize * 
> concurrencyLevel;
>   for (int i = 0; i < totalGetCount; i++) {
>     File indexFile = new File("shuffle_" + 0 + "_" + i + "_0.index");
>     ShuffleIndexInformation indexInfo = 
> Mockito.mock(ShuffleIndexInformation.class);
>     Mockito.when(indexInfo.getSize()).thenReturn(unitSize);
>     shuffleIndexCache.get(indexFile, () -> indexInfo);
>   }
>   long totalWeight =
>     
> shuffleIndexCache.asMap().values().stream().mapToLong(ShuffleIndexInformation::getSize).sum();
>   long size = shuffleIndexCache.size();
>   try{
>     Assert.assertTrue(size <= maxCacheCount);
>     Assert.assertTrue(totalWeight < maximumWeight);
>     fail("The tests code should not enter this line now.");
>   } catch (AssertionError error) {
>     // The code will enter this branch because LocalCache weight eviction 
> does not work
>     // when maxSegmentWeight is >= Int.MAX_VALUE.
>     Assert.assertTrue(size > maxCacheCount && size <= totalGetCount);
>     Assert.assertTrue(totalWeight > maximumWeight);
>   }
> }
> {code}
>  
> and from the debug view  we found that there are 2 segment.totalWeight is a 
> negative value
>  
> !image-2021-08-26-15-56-09-028.png!  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to