This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e66ef484 [#1680] improvement(server): Remove partial HDFS files that 
written by server self for expired apps (#1681)
8e66ef484 is described below

commit 8e66ef484894c479472192f7c2ea2ea33d6111e6
Author: xianjingfeng <[email protected]>
AuthorDate: Sat May 11 10:06:08 2024 +0800

    [#1680] improvement(server): Remove partial HDFS files that written by 
server self for expired apps (#1681)
    
    ### What changes were proposed in this pull request?
    
    Only remove the hdfs files that written by server themself when application 
is purged for expired.
    
    ### Why are the changes needed?
    
    Fix: #1680
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UT
---
 .../apache/uniffle/server/ShuffleTaskManager.java  |  5 +-
 .../apache/uniffle/server/event/AppPurgeEvent.java | 14 +++-
 .../server/storage/HadoopStorageManager.java       |  8 +-
 .../server/storage/HadoopStorageManagerTest.java   | 87 ++++++++++++++++++++++
 .../storage/factory/ShuffleHandlerFactory.java     |  2 +-
 .../handler/impl/HadoopShuffleDeleteHandler.java   | 31 +++++++-
 .../request/CreateShuffleDeleteHandlerRequest.java | 11 +++
 7 files changed, 151 insertions(+), 7 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 50ea004ad..8cf3e3eb2 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -774,7 +774,10 @@ public class ShuffleTaskManager {
       shuffleFlushManager.removeResources(appId);
       storageManager.removeResources(
           new AppPurgeEvent(
-              appId, shuffleTaskInfo.getUser(), new 
ArrayList<>(shuffleTaskInfo.getShuffleIds())));
+              appId,
+              shuffleTaskInfo.getUser(),
+              new ArrayList<>(shuffleTaskInfo.getShuffleIds()),
+              checkAppExpired));
       if (shuffleTaskInfo.hasHugePartition()) {
         ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
         ShuffleServerMetrics.gaugeHugePartitionNum.dec();
diff --git 
a/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java 
b/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
index 804d3849d..cb510d5c6 100644
--- a/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/event/AppPurgeEvent.java
@@ -20,12 +20,22 @@ package org.apache.uniffle.server.event;
 import java.util.List;
 
 public class AppPurgeEvent extends PurgeEvent {
+  private final boolean appExpired;
 
-  public AppPurgeEvent(String appId, String user, List<Integer> shuffleIds) {
+  public AppPurgeEvent(String appId, String user, List<Integer> shuffleIds, 
boolean appExpired) {
     super(appId, user, shuffleIds);
+    this.appExpired = appExpired;
+  }
+
+  public AppPurgeEvent(String appId, String user, List<Integer> shuffleIds) {
+    this(appId, user, shuffleIds, false);
   }
 
   public AppPurgeEvent(String appId, String user) {
-    super(appId, user, null);
+    this(appId, user, null);
+  }
+
+  public boolean isAppExpired() {
+    return appExpired;
   }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index 496c48551..7ab9ef639 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -57,12 +57,14 @@ public class HadoopStorageManager extends 
SingleStorageManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopStorageManager.class);
 
   private final Configuration hadoopConf;
+  private final String shuffleServerId;
   private Map<String, HadoopStorage> appIdToStorages = 
JavaUtils.newConcurrentMap();
   private Map<String, HadoopStorage> pathToStorages = 
JavaUtils.newConcurrentMap();
 
   HadoopStorageManager(ShuffleServerConf conf) {
     super(conf);
     hadoopConf = conf.getHadoopConf();
+    shuffleServerId = conf.getString(ShuffleServerConf.SHUFFLE_SERVER_ID, 
"shuffleServerId");
   }
 
   @Override
@@ -94,15 +96,19 @@ public class HadoopStorageManager extends 
SingleStorageManager {
     String appId = event.getAppId();
     HadoopStorage storage = getStorageByAppId(appId);
     if (storage != null) {
+      boolean purgeForExpired = false;
       if (event instanceof AppPurgeEvent) {
         storage.removeHandlers(appId);
         appIdToStorages.remove(appId);
+        purgeForExpired = ((AppPurgeEvent) event).isAppExpired();
       }
       ShuffleDeleteHandler deleteHandler =
           ShuffleHandlerFactory.getInstance()
               .createShuffleDeleteHandler(
                   new CreateShuffleDeleteHandlerRequest(
-                      StorageType.HDFS.name(), storage.getConf()));
+                      StorageType.HDFS.name(),
+                      storage.getConf(),
+                      purgeForExpired ? shuffleServerId : null));
 
       String basicPath =
           
ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
diff --git 
a/server/src/test/java/org/apache/uniffle/server/storage/HadoopStorageManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/storage/HadoopStorageManagerTest.java
index c9fee7436..663a7bfc5 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/storage/HadoopStorageManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/storage/HadoopStorageManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.server.storage;
 
+import java.io.File;
 import java.util.Arrays;
 import java.util.Map;
 
@@ -26,6 +27,7 @@ import com.google.common.collect.Sets;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -33,11 +35,14 @@ import org.apache.uniffle.server.ShuffleServerMetrics;
 import org.apache.uniffle.server.event.AppPurgeEvent;
 import org.apache.uniffle.server.event.ShufflePurgeEvent;
 import org.apache.uniffle.storage.common.HadoopStorage;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class HadoopStorageManagerTest {
 
@@ -112,4 +117,86 @@ public class HadoopStorageManagerTest {
     assertNull(hs3.getConf().get("k2"));
     assertNull(hs3.getConf().get("k3"));
   }
+
+  @Test
+  public void testRemoveExpiredResourcesWithTwoReplicas(@TempDir File 
remoteBasePath)
+      throws Exception {
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.setString(
+        ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE_HDFS.name());
+    String shuffleServerId = "127.0.0.1:19999";
+    conf.setString(ShuffleServerConf.SHUFFLE_SERVER_ID, shuffleServerId);
+    HadoopStorageManager hadoopStorageManager = new HadoopStorageManager(conf);
+    final String remoteStoragePath1 = new File(remoteBasePath, 
"path1").getAbsolutePath();
+    String appId = "testRemoveExpiredResources";
+    hadoopStorageManager.registerRemoteStorage(
+        appId, new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", 
"v1", "k2", "v2")));
+    Map<String, HadoopStorage> appStorageMap = 
hadoopStorageManager.getAppIdToStorages();
+
+    HadoopStorage storage = appStorageMap.get(appId);
+    String appPath = 
ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
+    File appPathFile = new File(appPath);
+    File partitionDir = new File(appPathFile, "1/1-1/");
+    partitionDir.mkdirs();
+    // Simulate the case that there are two shuffle servers write data.
+    File dataFile = new File(partitionDir, shuffleServerId + "_1.data");
+    dataFile.createNewFile();
+    File dataFile2 = new File(partitionDir, "shuffleserver2_1.data");
+    dataFile2.createNewFile();
+    assertTrue(partitionDir.exists());
+    // Purged for expired
+    assertEquals(1, appStorageMap.size());
+    AppPurgeEvent shufflePurgeEvent = new AppPurgeEvent(appId, "", null, true);
+    hadoopStorageManager.removeResources(shufflePurgeEvent);
+    assertEquals(0, appStorageMap.size());
+    // The directory of the partition should have not been deleted, for it was 
not empty.
+    assertTrue(partitionDir.exists());
+    assertFalse(dataFile.exists());
+    assertTrue(dataFile2.exists());
+
+    // Purged for unregister
+    AppPurgeEvent appPurgeEvent = new AppPurgeEvent(appId, "");
+    hadoopStorageManager.removeResources(appPurgeEvent);
+    assertEquals(0, appStorageMap.size());
+    assertFalse(appPathFile.exists());
+  }
+
+  @Test
+  public void testRemoveExpiredResourcesWithOneReplica(@TempDir File 
remoteBasePath)
+      throws Exception {
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.setString(
+        ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE_HDFS.name());
+    String shuffleServerId = "127.0.0.1:19999";
+    conf.setString(ShuffleServerConf.SHUFFLE_SERVER_ID, shuffleServerId);
+    HadoopStorageManager hadoopStorageManager = new HadoopStorageManager(conf);
+    final String remoteStoragePath1 = new File(remoteBasePath, 
"path1").getAbsolutePath();
+    String appId = "testRemoveExpiredResources2";
+    hadoopStorageManager.registerRemoteStorage(
+        appId, new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", 
"v1", "k2", "v2")));
+    Map<String, HadoopStorage> appStorageMap = 
hadoopStorageManager.getAppIdToStorages();
+
+    HadoopStorage storage = appStorageMap.get(appId);
+    String appPath = 
ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
+    File appPathFile = new File(appPath);
+    File partitionDir = new File(appPathFile, "1/1-1/");
+    partitionDir.mkdirs();
+    // Simulate the case that only one shuffle server writes data.
+    File dataFile = new File(partitionDir, shuffleServerId + "_1.data");
+    dataFile.createNewFile();
+    assertTrue(partitionDir.exists());
+    // purged for expired
+    assertEquals(1, appStorageMap.size());
+    AppPurgeEvent shufflePurgeEvent = new AppPurgeEvent(appId, "", null, true);
+    hadoopStorageManager.removeResources(shufflePurgeEvent);
+    assertEquals(0, appStorageMap.size());
+    // The directory of the application should have been deleted, for it was 
empty.
+    assertFalse(partitionDir.exists());
+
+    // purged for unregister
+    AppPurgeEvent appPurgeEvent = new AppPurgeEvent(appId, "");
+    hadoopStorageManager.removeResources(appPurgeEvent);
+    assertEquals(0, appStorageMap.size());
+    assertFalse(appPathFile.exists());
+  }
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
 
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index 819c26e04..eac9584e2 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -185,7 +185,7 @@ public class ShuffleHandlerFactory {
   public ShuffleDeleteHandler createShuffleDeleteHandler(
       CreateShuffleDeleteHandlerRequest request) {
     if (StorageType.HDFS.name().equals(request.getStorageType())) {
-      return new HadoopShuffleDeleteHandler(request.getConf());
+      return new HadoopShuffleDeleteHandler(request.getConf(), 
request.getShuffleServerId());
     } else if (StorageType.LOCALFILE.name().equals(request.getStorageType())) {
       return new LocalFileDeleteHandler();
     } else {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
index e3daa8ef3..d48d72402 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
@@ -17,7 +17,11 @@
 
 package org.apache.uniffle.storage.handler.impl;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -29,11 +33,13 @@ import 
org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
 public class HadoopShuffleDeleteHandler implements ShuffleDeleteHandler {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopShuffleDeleteHandler.class);
+  private final String shuffleServerId;
 
   private Configuration hadoopConf;
 
-  public HadoopShuffleDeleteHandler(Configuration hadoopConf) {
+  public HadoopShuffleDeleteHandler(Configuration hadoopConf, String 
shuffleServerId) {
     this.hadoopConf = hadoopConf;
+    this.shuffleServerId = shuffleServerId;
   }
 
   @Override
@@ -52,7 +58,7 @@ public class HadoopShuffleDeleteHandler implements 
ShuffleDeleteHandler {
       while (!isSuccess && times < retryMax) {
         try {
           FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, 
path, hadoopConf);
-          fileSystem.delete(path, true);
+          delete(fileSystem, path, shuffleServerId);
           isSuccess = true;
         } catch (Exception e) {
           times++;
@@ -86,4 +92,25 @@ public class HadoopShuffleDeleteHandler implements 
ShuffleDeleteHandler {
       }
     }
   }
+
+  private void delete(FileSystem fileSystem, Path path, String filePrefix) 
throws IOException {
+    if (filePrefix == null) {
+      fileSystem.delete(path, true);
+      return;
+    }
+    FileStatus[] fileStatuses = fileSystem.listStatus(path);
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        delete(fileSystem, fileStatus.getPath(), filePrefix);
+      } else {
+        if (fileStatus.getPath().getName().startsWith(filePrefix)) {
+          fileSystem.delete(fileStatus.getPath(), true);
+        }
+      }
+    }
+    ContentSummary contentSummary = fileSystem.getContentSummary(path);
+    if (contentSummary.getFileCount() == 0) {
+      fileSystem.delete(path, true);
+    }
+  }
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
 
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
index 6df61e58c..b8eda2895 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
@@ -23,10 +23,17 @@ public class CreateShuffleDeleteHandlerRequest {
 
   private String storageType;
   private Configuration conf;
+  private String shuffleServerId;
 
   public CreateShuffleDeleteHandlerRequest(String storageType, Configuration 
conf) {
+    this(storageType, conf, null);
+  }
+
+  public CreateShuffleDeleteHandlerRequest(
+      String storageType, Configuration conf, String shuffleServerId) {
     this.storageType = storageType;
     this.conf = conf;
+    this.shuffleServerId = shuffleServerId;
   }
 
   public String getStorageType() {
@@ -36,4 +43,8 @@ public class CreateShuffleDeleteHandlerRequest {
   public Configuration getConf() {
     return conf;
   }
+
+  public String getShuffleServerId() {
+    return shuffleServerId;
+  }
 }

Reply via email to