This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 23197e26ac HDDS-11605. Directory deletion service should support 
multiple threads (#7349)
23197e26ac is described below

commit 23197e26ac3fb4e2d3e6d66ddb96fe45714b165c
Author: Aryan Gupta <[email protected]>
AuthorDate: Mon Dec 9 17:42:48 2024 +0530

    HDDS-11605. Directory deletion service should support multiple threads 
(#7349)
---
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   5 +
 .../ozone/TestDirectoryDeletingServiceWithFSO.java |   4 +-
 .../hadoop/ozone/TestOzoneConfigurationFields.java |   1 +
 ...TestSnapshotDeletingServiceIntegrationTest.java |   8 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  16 +-
 .../om/service/AbstractKeyDeletingService.java     |  10 +-
 .../ozone/om/service/DirectoryDeletingService.java | 227 +++++++++++++--------
 .../om/service/TestDirectoryDeletingService.java   |  12 +-
 8 files changed, 187 insertions(+), 96 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 2cf5e4ced3..880fe8614b 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -421,6 +421,11 @@ public final class OMConfigKeys {
   // resulting 24MB
   public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 6000;
 
+  public static final String OZONE_THREAD_NUMBER_DIR_DELETION =
+      "ozone.thread.number.dir.deletion";
+
+  public static final int OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT = 10;
+
   public static final String SNAPSHOT_SST_DELETING_LIMIT_PER_TASK =
       "ozone.snapshot.filtering.limit.per.task";
   public static final int SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT = 2;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
index 8d161dedeb..78fb4c66fc 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java
@@ -529,7 +529,7 @@ public class TestDirectoryDeletingServiceWithFSO {
     when(ozoneManager.getOmSnapshotManager()).thenAnswer(i -> 
omSnapshotManager);
     DirectoryDeletingService service = Mockito.spy(new 
DirectoryDeletingService(1000, TimeUnit.MILLISECONDS, 1000,
         ozoneManager,
-        cluster.getConf()));
+        cluster.getConf(), 1));
     service.shutdown();
     final int initialSnapshotCount =
         (int) 
cluster.getOzoneManager().getMetadataManager().countRowsInTable(snapshotInfoTable);
@@ -563,7 +563,7 @@ public class TestDirectoryDeletingServiceWithFSO {
       }
       return i.callRealMethod();
     }).when(service).optimizeDirDeletesAndSubmitRequest(anyLong(), anyLong(), 
anyLong(),
-        anyLong(), anyList(), anyList(), eq(null), anyLong(), anyInt(), 
Mockito.any(), any());
+        anyLong(), anyList(), anyList(), eq(null), anyLong(), anyInt(), 
Mockito.any(), any(), anyLong());
 
     Mockito.doAnswer(i -> {
       store.createSnapshot(testVolumeName, testBucketName, snap2);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 8a219514d3..1fbfc1f1f7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -126,6 +126,7 @@ public class TestOzoneConfigurationFields extends 
TestConfigurationFieldsBase {
         OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY,
         OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER,
         OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
+        OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION,
         ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
         ScmConfigKeys.OZONE_SCM_HA_PREFIX,
         S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED,
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
index a9e5faa041..c3a58a1a21 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java
@@ -480,22 +480,22 @@ public class TestSnapshotDeletingServiceIntegrationTest {
 
   private DirectoryDeletingService 
getMockedDirectoryDeletingService(AtomicBoolean dirDeletionWaitStarted,
                                                                      
AtomicBoolean dirDeletionStarted)
-      throws InterruptedException, TimeoutException {
+      throws InterruptedException, TimeoutException, IOException {
     OzoneManager ozoneManager = Mockito.spy(om);
     om.getKeyManager().getDirDeletingService().shutdown();
     GenericTestUtils.waitFor(() -> 
om.getKeyManager().getDirDeletingService().getThreadCount() == 0, 1000,
         100000);
     DirectoryDeletingService directoryDeletingService = Mockito.spy(new 
DirectoryDeletingService(10000,
-        TimeUnit.MILLISECONDS, 100000, ozoneManager, cluster.getConf()));
+        TimeUnit.MILLISECONDS, 100000, ozoneManager, cluster.getConf(), 1));
     directoryDeletingService.shutdown();
     GenericTestUtils.waitFor(() -> directoryDeletingService.getThreadCount() 
== 0, 1000,
         100000);
-    when(ozoneManager.getMetadataManager()).thenAnswer(i -> {
+    doAnswer(i -> {
       // Wait for SDS to reach DDS wait block before processing any deleted 
directories.
       GenericTestUtils.waitFor(dirDeletionWaitStarted::get, 1000, 100000);
       dirDeletionStarted.set(true);
       return i.callRealMethod();
-    });
+    }).when(directoryDeletingService).getPendingDeletedDirInfo();
     return directoryDeletingService;
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 7532cf8b32..ccda21efc9 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -153,6 +153,8 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_S
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
 import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
@@ -257,8 +259,16 @@ public class KeyManagerImpl implements KeyManager {
           OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
           OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
           TimeUnit.MILLISECONDS);
-      dirDeletingService = new DirectoryDeletingService(dirDeleteInterval,
-          TimeUnit.MILLISECONDS, serviceTimeout, ozoneManager, configuration);
+      int dirDeletingServiceCorePoolSize =
+          configuration.getInt(OZONE_THREAD_NUMBER_DIR_DELETION,
+              OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT);
+      if (dirDeletingServiceCorePoolSize <= 0) {
+        dirDeletingServiceCorePoolSize = 1;
+      }
+      dirDeletingService =
+          new DirectoryDeletingService(dirDeleteInterval, 
TimeUnit.MILLISECONDS,
+              serviceTimeout, ozoneManager, configuration,
+              dirDeletingServiceCorePoolSize);
       dirDeletingService.start();
     }
 
@@ -2052,7 +2062,7 @@ public class KeyManagerImpl implements KeyManager {
         parentInfo.getObjectID(), "");
     long countEntries = 0;
 
-    Table dirTable = metadataManager.getDirectoryTable();
+    Table<String, OmDirectoryInfo> dirTable = 
metadataManager.getDirectoryTable();
     try (TableIterator<String,
         ? extends Table.KeyValue<String, OmDirectoryInfo>>
         iterator = dirTable.iterator()) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 76c16232e3..0ac6c98660 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -280,7 +280,7 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
 
   protected void submitPurgePaths(List<PurgePathRequest> requests,
                                   String snapTableKey,
-                                  UUID expectedPreviousSnapshotId) {
+                                  UUID expectedPreviousSnapshotId, long rnCnt) 
{
     OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest 
=
         OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
 
@@ -305,7 +305,7 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
 
     // Submit Purge paths request to OM
     try {
-      OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, 
runCount.get());
+      OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, 
rnCnt);
     } catch (ServiceException e) {
       LOG.error("PurgePaths request failed. Will retry at next run.", e);
     }
@@ -400,7 +400,7 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
       List<PurgePathRequest> purgePathRequestList,
       String snapTableKey, long startTime,
       int remainingBufLimit, KeyManager keyManager,
-      UUID expectedPreviousSnapshotId) {
+      UUID expectedPreviousSnapshotId, long rnCnt) {
 
     // Optimization to handle delete sub-dir and keys to remove quickly
     // This case will be useful to handle when depth of directory is high
@@ -442,7 +442,7 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
     }
 
     if (!purgePathRequestList.isEmpty()) {
-      submitPurgePaths(purgePathRequestList, snapTableKey, 
expectedPreviousSnapshotId);
+      submitPurgePaths(purgePathRequestList, snapTableKey, 
expectedPreviousSnapshotId, rnCnt);
     }
 
     if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
@@ -455,7 +455,7 @@ public abstract class AbstractKeyDeletingService extends 
BackgroundService
               "DeletedDirectoryTable, iteration elapsed: {}ms," +
               " totalRunCount: {}",
           dirNum, subdirDelNum, subFileNum, (subDirNum - subdirDelNum),
-          Time.monotonicNow() - startTime, getRunCount());
+          Time.monotonicNow() - startTime, rnCnt);
     }
     return remainNum;
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 09f4a8f8a3..a8270f92f2 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -49,6 +50,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
@@ -74,10 +76,10 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
   public static final Logger LOG =
       LoggerFactory.getLogger(DirectoryDeletingService.class);
 
-  // Use only a single thread for DirDeletion. Multiple threads would read
-  // or write to same tables and can send deletion requests for same key
-  // multiple times.
-  private static final int DIR_DELETING_CORE_POOL_SIZE = 1;
+  // Using multi thread for DirDeletion. Multiple threads would read
+  // from parent directory info from deleted directory table concurrently
+  // and send deletion requests.
+  private final int dirDeletingCorePoolSize;
   private static final int MIN_ERR_LIMIT_PER_TASK = 1000;
 
   // Number of items(dirs/files) to be batched in an iteration.
@@ -86,11 +88,15 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
   private final AtomicBoolean suspended;
   private AtomicBoolean isRunningOnAOS;
 
+  private final DeletedDirSupplier deletedDirSupplier;
+
+  private AtomicInteger taskCount = new AtomicInteger(0);
+
   public DirectoryDeletingService(long interval, TimeUnit unit,
       long serviceTimeout, OzoneManager ozoneManager,
-      OzoneConfiguration configuration) {
+      OzoneConfiguration configuration, int dirDeletingServiceCorePoolSize) {
     super(DirectoryDeletingService.class.getSimpleName(), interval, unit,
-        DIR_DELETING_CORE_POOL_SIZE, serviceTimeout, ozoneManager, null);
+        dirDeletingServiceCorePoolSize, serviceTimeout, ozoneManager, null);
     this.pathLimitPerTask = configuration
         .getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK,
             OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
@@ -102,6 +108,9 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
     this.ratisByteLimit = (int) (limit * 0.9);
     this.suspended = new AtomicBoolean(false);
     this.isRunningOnAOS = new AtomicBoolean(false);
+    this.dirDeletingCorePoolSize = dirDeletingServiceCorePoolSize;
+    deletedDirSupplier = new DeletedDirSupplier();
+    taskCount.set(0);
   }
 
   private boolean shouldRun() {
@@ -116,6 +125,10 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
     return isRunningOnAOS.get();
   }
 
+  public AtomicInteger getTaskCount() {
+    return taskCount;
+  }
+
   /**
    * Suspend the service.
    */
@@ -135,10 +148,55 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
   @Override
   public BackgroundTaskQueue getTasks() {
     BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    queue.add(new DirectoryDeletingService.DirDeletingTask(this));
+    if (taskCount.get() > 0) {
+      LOG.info("{} Directory deleting task(s) already in progress.",
+          taskCount.get());
+      return queue;
+    }
+    try {
+      deletedDirSupplier.reInitItr();
+    } catch (IOException ex) {
+      LOG.error("Unable to get the iterator.", ex);
+      return queue;
+    }
+    taskCount.set(dirDeletingCorePoolSize);
+    for (int i = 0; i < dirDeletingCorePoolSize; i++) {
+      queue.add(new DirectoryDeletingService.DirDeletingTask(this));
+    }
     return queue;
   }
 
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    deletedDirSupplier.closeItr();
+  }
+
+  private final class DeletedDirSupplier {
+    private TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
+        deleteTableIterator;
+
+    private synchronized Table.KeyValue<String, OmKeyInfo> get()
+        throws IOException {
+      if (deleteTableIterator.hasNext()) {
+        return deleteTableIterator.next();
+      }
+      return null;
+    }
+
+    private synchronized void closeItr() {
+      IOUtils.closeQuietly(deleteTableIterator);
+      deleteTableIterator = null;
+    }
+
+    private synchronized void reInitItr() throws IOException {
+      closeItr();
+      deleteTableIterator =
+          getOzoneManager().getMetadataManager().getDeletedDirTable()
+              .iterator();
+    }
+  }
+
   private final class DirDeletingTask implements BackgroundTask {
     private final DirectoryDeletingService directoryDeletingService;
 
@@ -153,89 +211,93 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
 
     @Override
     public BackgroundTaskResult call() {
-      if (shouldRun()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Running DirectoryDeletingService");
-        }
-        isRunningOnAOS.set(true);
-        getRunCount().incrementAndGet();
-        long dirNum = 0L;
-        long subDirNum = 0L;
-        long subFileNum = 0L;
-        long remainNum = pathLimitPerTask;
-        int consumedSize = 0;
-        List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
-        List<Pair<String, OmKeyInfo>> allSubDirList
-            = new ArrayList<>((int) remainNum);
-
-        Table.KeyValue<String, OmKeyInfo> pendingDeletedDirInfo;
-
-        try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
-                 deleteTableIterator = getOzoneManager().getMetadataManager().
-            getDeletedDirTable().iterator()) {
+      try {
+        if (shouldRun()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Running DirectoryDeletingService");
+          }
+          isRunningOnAOS.set(true);
+          long rnCnt = getRunCount().incrementAndGet();
+          long dirNum = 0L;
+          long subDirNum = 0L;
+          long subFileNum = 0L;
+          long remainNum = pathLimitPerTask;
+          int consumedSize = 0;
+          List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
+          List<Pair<String, OmKeyInfo>> allSubDirList =
+              new ArrayList<>((int) remainNum);
+
+          Table.KeyValue<String, OmKeyInfo> pendingDeletedDirInfo;
           // This is to avoid race condition b/w purge request and snapshot 
chain updation. For AOS taking the global
           // snapshotId since AOS could process multiple buckets in one 
iteration.
-          UUID expectedPreviousSnapshotId =
-              
((OmMetadataManagerImpl)getOzoneManager().getMetadataManager()).getSnapshotChainManager()
-                  .getLatestGlobalSnapshotId();
-
-          long startTime = Time.monotonicNow();
-          while (remainNum > 0 && deleteTableIterator.hasNext()) {
-            pendingDeletedDirInfo = deleteTableIterator.next();
-            // Do not reclaim if the directory is still being referenced by
-            // the previous snapshot.
-            if (previousSnapshotHasDir(pendingDeletedDirInfo)) {
-              continue;
-            }
+          try {
+            UUID expectedPreviousSnapshotId =
+                ((OmMetadataManagerImpl) 
getOzoneManager().getMetadataManager()).getSnapshotChainManager()
+                    .getLatestGlobalSnapshotId();
 
-            PurgePathRequest request = prepareDeleteDirRequest(
-                remainNum, pendingDeletedDirInfo.getValue(),
-                pendingDeletedDirInfo.getKey(), allSubDirList,
-                getOzoneManager().getKeyManager());
-            if (isBufferLimitCrossed(ratisByteLimit, consumedSize,
-                request.getSerializedSize())) {
-              if (purgePathRequestList.size() != 0) {
-                // if message buffer reaches max limit, avoid sending further
-                remainNum = 0;
+            long startTime = Time.monotonicNow();
+            while (remainNum > 0) {
+              pendingDeletedDirInfo = getPendingDeletedDirInfo();
+              if (pendingDeletedDirInfo == null) {
                 break;
               }
-              // if directory itself is having a lot of keys / files,
-              // reduce capacity to minimum level
-              remainNum = MIN_ERR_LIMIT_PER_TASK;
-              request = prepareDeleteDirRequest(
-                  remainNum, pendingDeletedDirInfo.getValue(),
+              // Do not reclaim if the directory is still being referenced by
+              // the previous snapshot.
+              if (previousSnapshotHasDir(pendingDeletedDirInfo)) {
+                continue;
+              }
+
+              PurgePathRequest request = prepareDeleteDirRequest(remainNum,
+                  pendingDeletedDirInfo.getValue(),
                   pendingDeletedDirInfo.getKey(), allSubDirList,
                   getOzoneManager().getKeyManager());
+              if (isBufferLimitCrossed(ratisByteLimit, consumedSize,
+                  request.getSerializedSize())) {
+                if (purgePathRequestList.size() != 0) {
+                  // if message buffer reaches max limit, avoid sending further
+                  remainNum = 0;
+                  break;
+                }
+                // if directory itself is having a lot of keys / files,
+                // reduce capacity to minimum level
+                remainNum = MIN_ERR_LIMIT_PER_TASK;
+                request = prepareDeleteDirRequest(remainNum,
+                    pendingDeletedDirInfo.getValue(),
+                    pendingDeletedDirInfo.getKey(), allSubDirList,
+                    getOzoneManager().getKeyManager());
+              }
+              consumedSize += request.getSerializedSize();
+              purgePathRequestList.add(request);
+              // reduce remain count for self, sub-files, and sub-directories
+              remainNum = remainNum - 1;
+              remainNum = remainNum - request.getDeletedSubFilesCount();
+              remainNum = remainNum - request.getMarkDeletedSubDirsCount();
+              // Count up the purgeDeletedDir, subDirs and subFiles
+              if (request.getDeletedDir() != null && !request.getDeletedDir()
+                  .isEmpty()) {
+                dirNum++;
+              }
+              subDirNum += request.getMarkDeletedSubDirsCount();
+              subFileNum += request.getDeletedSubFilesCount();
             }
-            consumedSize += request.getSerializedSize();
-            purgePathRequestList.add(request);
-            // reduce remain count for self, sub-files, and sub-directories
-            remainNum = remainNum - 1;
-            remainNum = remainNum - request.getDeletedSubFilesCount();
-            remainNum = remainNum - request.getMarkDeletedSubDirsCount();
-            // Count up the purgeDeletedDir, subDirs and subFiles
-            if (request.getDeletedDir() != null
-                && !request.getDeletedDir().isEmpty()) {
-              dirNum++;
-            }
-            subDirNum += request.getMarkDeletedSubDirsCount();
-            subFileNum += request.getDeletedSubFilesCount();
-          }
+            optimizeDirDeletesAndSubmitRequest(remainNum, dirNum, subDirNum,
+                subFileNum, allSubDirList, purgePathRequestList, null,
+                startTime, ratisByteLimit - consumedSize,
+                getOzoneManager().getKeyManager(), expectedPreviousSnapshotId,
+                rnCnt);
 
-          optimizeDirDeletesAndSubmitRequest(
-              remainNum, dirNum, subDirNum, subFileNum,
-              allSubDirList, purgePathRequestList, null, startTime,
-              ratisByteLimit - consumedSize,
-              getOzoneManager().getKeyManager(), expectedPreviousSnapshotId);
-
-        } catch (IOException e) {
-          LOG.error("Error while running delete directories and files " +
-              "background task. Will retry at next run.", e);
-        }
-        isRunningOnAOS.set(false);
-        synchronized (directoryDeletingService) {
-          this.directoryDeletingService.notify();
+          } catch (IOException e) {
+            LOG.error(
+                "Error while running delete directories and files " + 
"background task. Will retry at next run.",
+                e);
+          }
+          isRunningOnAOS.set(false);
+          synchronized (directoryDeletingService) {
+            this.directoryDeletingService.notify();
+          }
         }
+      } finally {
+        taskCount.getAndDecrement();
       }
       // place holder by returning empty results of this call back.
       return BackgroundTaskResult.EmptyTaskResult.newResult();
@@ -301,4 +363,9 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
     }
   }
 
+  public KeyValue<String, OmKeyInfo> getPendingDeletedDirInfo()
+      throws IOException {
+    return deletedDirSupplier.get();
+  }
+
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
index 04e8efa7b7..681b24b8e4 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
@@ -51,6 +51,9 @@ import org.junit.jupiter.api.io.TempDir;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -202,14 +205,19 @@ public class TestDirectoryDeletingService {
         .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE))
         .setDataSize(0).setRecursive(true).build();
     writeClient.deleteKey(delArgs);
+    int pathDelLimit = conf.getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK,
+        OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
+    int numThread = conf.getInt(OZONE_THREAD_NUMBER_DIR_DELETION,
+        OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT);
 
     // check if difference between each run should not cross the directory 
deletion limit
     // and wait till all dir is removed
     GenericTestUtils.waitFor(() -> {
       delDirCnt[1] = dirDeletingService.getDeletedDirsCount();
-      assertTrue(delDirCnt[1] - delDirCnt[0] <= 
OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT,
+      assertTrue(
+          delDirCnt[1] - delDirCnt[0] <= ((long) pathDelLimit * numThread),
           "base: " + delDirCnt[0] + ", new: " + delDirCnt[1]);
-      delDirCnt[0] =  delDirCnt[1];
+      delDirCnt[0] = delDirCnt[1];
       return dirDeletingService.getDeletedDirsCount() >= dirCreatesCount;
     }, 500, 300000);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to