This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f347338 [ISSUE-228][FEATURE] Add a period local storage cleaner 
thread (#357)
2f347338 is described below

commit 2f3473381d5524aa15696eab3b1623dcb5ddf2b6
Author: sfwang218 <[email protected]>
AuthorDate: Fri Nov 25 21:24:52 2022 +0800

    [ISSUE-228][FEATURE] Add a period local storage cleaner thread (#357)
    
    ### What changes were proposed in this pull request?
    Add a period local storage cleaner thread to check the leak shuffle data on 
disks
    
    ### Why are the changes needed?
    Because we don't use strict lock strategy, we will delete some shuffle data 
normally. If we use strict lock strategy, the performance will be bad. So we 
want to add a period local storage cleaner thread. The thread examine which 
application is in the disk and judge whether the app exists the memory data 
structure ShuffleTaskManager#shuffleTaskInfos. If memory data structure don't 
exist, we will delete the data in the disk, with issue #282
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    tests were added
    
    Co-authored-by: wangshifa <[email protected]>
---
 docs/server_guide.md                               |  1 +
 .../apache/uniffle/server/ShuffleServerConf.java   |  6 ++
 .../apache/uniffle/server/ShuffleTaskManager.java  | 19 +++++
 .../uniffle/server/storage/HdfsStorageManager.java |  5 ++
 .../server/storage/LocalStorageManager.java        | 27 +++++++
 .../server/storage/MultiStorageManager.java        |  6 ++
 .../uniffle/server/storage/StorageManager.java     |  5 +-
 .../uniffle/server/ShuffleTaskManagerTest.java     | 89 ++++++++++++++++++++++
 .../uniffle/storage/common/LocalStorage.java       | 15 ++++
 9 files changed, 172 insertions(+), 1 deletion(-)

diff --git a/docs/server_guide.md b/docs/server_guide.md
index efdf767a..871aa237 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -83,3 +83,4 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 |rss.server.single.buffer.flush.threshold|64M|The threshold of single shuffle 
buffer flush|
 |rss.server.disk.capacity|-1|Disk capacity that shuffle server can use. If 
it's negative, it will use the default disk whole space|
 |rss.server.multistorage.fallback.strategy.class|-|The fallback strategy for 
`MEMORY_LOCALFILE_HDFS`. Support 
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
 and `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`. If 
not set, `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy` 
will be used.|
+|rss.server.leak.shuffledata.check.interval|3600000|The interval of leak 
shuffle data check (ms)|
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index b8bdf7ac..52acc549 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -308,6 +308,12 @@ public class ShuffleServerConf extends RssBaseConf {
       .defaultValue(60 * 1000L)
       .withDescription("The timeout of the cache which record the mapping 
information");
 
+  public static final ConfigOption<Long> 
SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL = ConfigOptions
+          .key("rss.server.leak.shuffledata.check.interval")
+          .longType()
+          .defaultValue(3600 * 1000L)
+          .withDescription("the interval of leak shuffle data check");
+
   public ShuffleServerConf() {
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index cea1dcb6..6fe1e412 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -70,12 +70,14 @@ public class ShuffleTaskManager {
   private final ShuffleFlushManager shuffleFlushManager;
   private final ScheduledExecutorService scheduledExecutorService;
   private final ScheduledExecutorService expiredAppCleanupExecutorService;
+  private final ScheduledExecutorService leakShuffleDataCheckExecutorService;
   private final StorageManager storageManager;
   private AtomicLong requireBufferId = new AtomicLong(0);
   private ShuffleServerConf conf;
   private long appExpiredWithoutHB;
   private long preAllocationExpired;
   private long commitCheckIntervalMax;
+  private long leakShuffleDataCheckInterval;
   // appId -> shuffleId -> blockIds to avoid too many appId
   // store taskAttemptId info to filter speculation task
   // Roaring64NavigableMap instance will cost much memory,
@@ -102,6 +104,7 @@ public class ShuffleTaskManager {
     this.appExpiredWithoutHB = 
conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT);
     this.commitCheckIntervalMax = 
conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX);
     this.preAllocationExpired = 
conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
+    this.leakShuffleDataCheckInterval = 
conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL);
     // the thread for checking application status
     this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
         ThreadUtils.getThreadFactory("checkResource-%d"));
@@ -113,6 +116,11 @@ public class ShuffleTaskManager {
     expiredAppCleanupExecutorService.scheduleAtFixedRate(
         () -> checkResourceStatus(), appExpiredWithoutHB / 2,
         appExpiredWithoutHB / 2, TimeUnit.MILLISECONDS);
+    this.leakShuffleDataCheckExecutorService = 
Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("leakShuffleDataChecker"));
+    leakShuffleDataCheckExecutorService.scheduleAtFixedRate(
+        () -> checkLeakShuffleData(), leakShuffleDataCheckInterval,
+            leakShuffleDataCheckInterval, TimeUnit.MILLISECONDS);
     // the thread for clear expired resources
     clearResourceThread = () -> {
       while (true) {
@@ -452,6 +460,17 @@ public class ShuffleTaskManager {
         appId, shuffleIds, System.currentTimeMillis() - start);
   }
 
+  public void checkLeakShuffleData() {
+    LOG.info("Start check leak shuffle data");
+    try {
+      Set<String> appIds = Sets.newHashSet(shuffleTaskInfos.keySet());
+      storageManager.checkAndClearLeakShuffleData(appIds);
+      LOG.info("Finish check leak shuffle data");
+    } catch (Exception e) {
+      LOG.warn("Error happened in checkLeakShuffleData", e);
+    }
+  }
+
   @VisibleForTesting
   public void removeResources(String appId) {
     LOG.info("Start remove resource for appId[" + appId + "]");
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 3087952e..2151e75d 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.server.storage;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -134,6 +135,10 @@ public class HdfsStorageManager extends 
SingleStorageManager {
     appIdToStorages.putIfAbsent(appId, pathToStorages.get(remoteStorage));
   }
 
+  @Override
+  public void checkAndClearLeakShuffleData(Collection<String> appIds) {
+  }
+
   public HdfsStorage getStorageByAppId(String appId) {
     if (!appIdToStorages.containsKey(appId)) {
       synchronized (this) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 4ba3fd3b..db0f47e3 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -19,7 +19,9 @@ package org.apache.uniffle.server.storage;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -62,6 +64,7 @@ import static 
org.apache.uniffle.server.ShuffleServerConf.LOCAL_STORAGE_INITIALI
 
 public class LocalStorageManager extends SingleStorageManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalStorageManager.class);
+  private static final String UNKNOWN_USER_NAME = "unknown";
 
   private final List<LocalStorage> localStorages;
   private final List<String> storageBasePaths;
@@ -218,6 +221,30 @@ public class LocalStorageManager extends 
SingleStorageManager {
     // ignore
   }
 
+  @Override
+  public void checkAndClearLeakShuffleData(Collection<String> appIds) {
+    Set<String> appIdsOnStorages = new HashSet<>();
+    for (LocalStorage localStorage : localStorages) {
+      if (!localStorage.isCorrupted()) {
+        Set<String> appIdsOnStorage = localStorage.getAppIds();
+        appIdsOnStorages.addAll(appIdsOnStorage);
+      }
+    }
+
+    for (String appId : appIdsOnStorages) {
+      if (!appIds.contains(appId)) {
+        ShuffleDeleteHandler deleteHandler = 
ShuffleHandlerFactory.getInstance()
+            .createShuffleDeleteHandler(
+               new 
CreateShuffleDeleteHandlerRequest(StorageType.LOCALFILE.name(), new 
Configuration()));
+        String[] deletePaths = new String[storageBasePaths.size()];
+        for (int i = 0; i < storageBasePaths.size(); i++) {
+          deletePaths[i] = 
ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePaths.get(i), appId);
+        }
+        deleteHandler.delete(deletePaths, appId, UNKNOWN_USER_NAME);
+      }
+    }
+  }
+
   void repair() {
     boolean hasNewCorruptedStorage = false;
     for (LocalStorage storage : localStorages) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index 942ae5bb..adb84369 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server.storage;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.cache.Cache;
@@ -159,6 +160,11 @@ public class MultiStorageManager implements StorageManager 
{
     return warmStorageManager.canWrite(event) || 
coldStorageManager.canWrite(event);
   }
 
+  @Override
+  public void checkAndClearLeakShuffleData(Collection<String> appIds) {
+    warmStorageManager.checkAndClearLeakShuffleData(appIds);
+  }
+
   public void removeResources(PurgeEvent event) {
     LOG.info("Start to remove resource of {}", event);
     warmStorageManager.removeResources(event);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java 
b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index a46bea29..2ba7b4c5 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.uniffle.server.storage;
 
+import java.util.Collection;
+
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.server.Checker;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
@@ -25,7 +27,6 @@ import org.apache.uniffle.server.event.PurgeEvent;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 
-
 public interface StorageManager {
 
   Storage selectStorage(ShuffleDataFlushEvent event);
@@ -51,4 +52,6 @@ public interface StorageManager {
   boolean canWrite(ShuffleDataFlushEvent event);
 
   // todo: add an interface that check storage isHealthy
+
+  void checkAndClearLeakShuffleData(Collection<String> appIds);
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 99b79327..90b67a94 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.server;
 import java.io.File;
 import java.nio.file.Files;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.BufferSegment;
@@ -49,8 +51,10 @@ import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.storage.LocalStorageManager;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.storage.HdfsTestBase;
+import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 import org.apache.uniffle.storage.util.StorageType;
@@ -633,6 +637,91 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     }
   }
 
+  @Test
+  public void checkAndClearLeakShuffleDataTest(@TempDir File tempDir) throws 
Exception {
+    final String appId = "clearLocalTest_appId";
+
+    ShuffleServerConf conf = new ShuffleServerConf();
+    final int shuffleId = 1;
+    conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+    conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+    conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+    conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+    conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 64L);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 
50.0);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 
0.0);
+    conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+    conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
+    conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+    conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(tempDir.getAbsolutePath()));
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE.name());
+    conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
+    // make sure not to check leak shuffle data automatically
+    conf.setLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL, 
600 * 1000L);
+
+    ShuffleServer shuffleServer = new ShuffleServer(conf);
+    ShuffleTaskManager shuffleTaskManager = 
shuffleServer.getShuffleTaskManager();
+    shuffleTaskManager.registerShuffle(
+            appId,
+            shuffleId,
+            Lists.newArrayList(new PartitionRange(0, 1)),
+            RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+            StringUtils.EMPTY
+    );
+
+    shuffleTaskManager.refreshAppId(appId);
+    assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+    ShufflePartitionedData shuffleData = createPartitionedData(1, 1, 48);
+
+    // make sure shuffle data flush to disk
+    int retry = 0;
+    while (retry < 5) {
+      Thread.sleep(1000);
+      shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, 
shuffleData);
+      shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
shuffleData.getBlockList());
+      shuffleTaskManager.refreshAppId(appId);
+      shuffleTaskManager.checkResourceStatus();
+
+      retry++;
+    }
+
+    StorageManager storageManager = shuffleServer.getStorageManager();
+    assertTrue(storageManager instanceof LocalStorageManager);
+    LocalStorageManager localStorageManager = (LocalStorageManager) 
storageManager;
+    // parse appIds from storage
+    Set<String> appIdsOnDisk = getAppIdsOnDisk(localStorageManager);
+    assertEquals(appIdsOnDisk.size(), shuffleTaskManager.getAppIds().size());
+    assertTrue(appIdsOnDisk.contains(appId));
+
+    // make sure heartbeat timeout and resources are removed
+    Thread.sleep(5000);
+
+    appIdsOnDisk = getAppIdsOnDisk(localStorageManager);
+    assertFalse(appIdsOnDisk.contains(appId));
+
+    // mock leak shuffle data
+    File file = new File(tempDir, appId);
+    assertFalse(file.exists());
+    file.mkdir();
+    assertTrue(file.exists());
+
+    // execute checkLeakShuffleData
+    shuffleTaskManager.checkLeakShuffleData();
+    assertFalse(file.exists());
+  }
+
+  private Set<String> getAppIdsOnDisk(LocalStorageManager localStorageManager) 
{
+    Set<String> appIdsOnDisk = new HashSet<>();
+
+    List<LocalStorage> storages = localStorageManager.getStorages();
+    for (LocalStorage storage : storages) {
+      appIdsOnDisk.addAll(storage.getAppIds());
+    }
+
+    return appIdsOnDisk;
+  }
+
   // copy from ClientUtils
   private Long getBlockId(long partitionId, long taskAttemptId, long 
atomicInt) {
     return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + 
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index 19c886ee..4032f3b1 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.storage.common;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Queue;
 import java.util.Set;
@@ -316,6 +317,20 @@ public class LocalStorage extends AbstractStorage {
     isCorrupted = true;
   }
 
+  public Set<String> getAppIds() {
+    Set<String> appIds = new HashSet<>();
+    File baseFolder = new File(basePath);
+    File[] files = baseFolder.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        if (file.isDirectory()) {
+          appIds.add(file.getName());
+        }
+      }
+    }
+    return appIds;
+  }
+
   public static class Builder {
     private long capacity;
     private double lowWaterMarkOfWrite;

Reply via email to