This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 63ee00df7a6 HDDS-13844. Decouple DirectoryDeletingService delete 
batching from Ratis request size. (#9270)
63ee00df7a6 is described below

commit 63ee00df7a6c663f629e43fdbf615c8ffed74613
Author: Aryan Gupta <[email protected]>
AuthorDate: Fri Nov 21 01:48:49 2025 +0530

    HDDS-13844. Decouple DirectoryDeletingService delete batching from Ratis 
request size. (#9270)
---
 .../common/src/main/resources/ozone-default.xml    |   8 ++
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   5 +
 .../TestDirectoryDeletingServiceWithFSO.java       |   5 +-
 .../apache/hadoop/ozone/om/DeleteKeysResult.java   |  10 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |  13 +--
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  38 +++----
 .../ozone/om/service/DirectoryDeletingService.java | 124 ++++++++++++++-------
 .../om/service/TestDirectoryDeletingService.java   |  85 +++++++++++++-
 8 files changed, 205 insertions(+), 83 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 1e8df1c6747..5d36eb3b8f2 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -701,6 +701,14 @@
       hdds.container.ratis.datanode.storage.dir be configured separately.
     </description>
   </property>
+  <property>
+    <name>ozone.path.deleting.limit.per.task</name>
+    <value>20000</value>
+    <tag>OZONE, PERFORMANCE, OM</tag>
+    <description>A maximum number of paths(dirs/files) to be deleted by
+      directory deleting service per time interval.
+    </description>
+  </property>
   <property>
     <name>ozone.metadata.dirs.permissions</name>
     <value>750</value>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 469900aa8ea..ce00ec86b92 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -385,6 +385,11 @@ public final class OMConfigKeys {
   public static final String OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT
       = "60s";
 
+  public static final String OZONE_PATH_DELETING_LIMIT_PER_TASK =
+      "ozone.path.deleting.limit.per.task";
+  // default is 20000 taking account of 32MB buffer size
+  public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 20000;
+
   /**
    * Configuration properties for Snapshot Directory Service.
    */
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
index 81e1dd3b444..eb77ac1dce3 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
@@ -41,6 +41,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.LongSupplier;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -623,9 +624,9 @@ public void 
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
       }
       return null;
     }).when(service).optimizeDirDeletesAndSubmitRequest(anyLong(), anyLong(),
-        anyLong(), anyList(), anyList(), eq(null), anyLong(), anyLong(), any(),
+        anyLong(), anyList(), anyList(), eq(null), anyLong(), any(),
         any(ReclaimableDirFilter.class), any(ReclaimableKeyFilter.class), 
anyMap(), any(),
-        anyLong());
+        anyLong(), any(AtomicInteger.class));
 
     Mockito.doAnswer(i -> {
       store.createSnapshot(testVolumeName, testBucketName, snap2);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java
index 60378467d6d..2b685edf273 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java
@@ -27,14 +27,11 @@
 public class DeleteKeysResult {
 
   private List<OmKeyInfo> keysToDelete;
-  private long consumedSize;
 
   private boolean processedKeys;
 
-  public DeleteKeysResult(List<OmKeyInfo> keysToDelete,
-      long consumedSize, boolean processedKeys) {
+  public DeleteKeysResult(List<OmKeyInfo> keysToDelete, boolean processedKeys) 
{
     this.keysToDelete = keysToDelete;
-    this.consumedSize = consumedSize;
     this.processedKeys = processedKeys;
   }
 
@@ -42,11 +39,8 @@ public List<OmKeyInfo> getKeysToDelete() {
     return keysToDelete;
   }
 
-  public long getConsumedSize() {
-    return consumedSize;
-  }
-
   public boolean isProcessedKeys() {
     return processedKeys;
   }
+
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 872a99e94b1..b0562049f71 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -306,9 +306,9 @@ default List<Table.KeyValue<String, OmKeyInfo>> 
getDeletedDirEntries(String volu
    * @return list of dirs
    * @throws IOException
    */
-  DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId,
-      OmKeyInfo parentInfo, CheckedFunction<Table.KeyValue<String, OmKeyInfo>, 
Boolean, IOException> filter,
-      long remainingBufLimit) throws IOException;
+  DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId, 
OmKeyInfo parentInfo,
+      CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> 
filter, int remainingNum)
+      throws IOException;
 
   /**
    * Returns all sub files under the given parent directory.
@@ -317,10 +317,9 @@ DeleteKeysResult getPendingDeletionSubDirs(long volumeId, 
long bucketId,
    * @return list of files
    * @throws IOException
    */
-  DeleteKeysResult getPendingDeletionSubFiles(long volumeId,
-      long bucketId, OmKeyInfo parentInfo,
-      CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> 
filter, long remainingBufLimit)
-          throws IOException;
+  DeleteKeysResult getPendingDeletionSubFiles(long volumeId, long bucketId, 
OmKeyInfo parentInfo,
+      CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> 
filter, int remainingNum)
+      throws IOException;
 
   /**
    * Returns the instance of Directory Deleting Service.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index eaca8ab2c52..19ef77d3457 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -2276,49 +2276,37 @@ private void slimLocationVersion(OmKeyInfo... keyInfos) 
{
   }
 
   @Override
-  public DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long 
bucketId,
-      OmKeyInfo parentInfo, CheckedFunction<KeyValue<String, OmKeyInfo>, 
Boolean, IOException> filter,
-      long remainingBufLimit) throws IOException {
+  public DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long 
bucketId, OmKeyInfo parentInfo,
+      CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> 
filter, int remainingNum) throws IOException {
     return gatherSubPathsWithIterator(volumeId, bucketId, parentInfo, 
metadataManager.getDirectoryTable(),
         kv -> 
Table.newKeyValue(metadataManager.getOzoneDeletePathKey(kv.getValue().getObjectID(),
 kv.getKey()),
-            OMFileRequest.getKeyInfoWithFullPath(parentInfo, kv.getValue())),
-        filter, remainingBufLimit);
+            OMFileRequest.getKeyInfoWithFullPath(parentInfo, kv.getValue())), 
filter, remainingNum);
   }
 
-  private <T extends WithParentObjectId> DeleteKeysResult 
gatherSubPathsWithIterator(
-      long volumeId, long bucketId, OmKeyInfo parentInfo,
-      Table<String, T> table,
+  private <T extends WithParentObjectId> DeleteKeysResult 
gatherSubPathsWithIterator(long volumeId, long bucketId,
+      OmKeyInfo parentInfo, Table<String, T> table,
       CheckedFunction<KeyValue<String, T>, KeyValue<String, OmKeyInfo>, 
IOException> deleteKeyTransformer,
-      CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> 
deleteKeyFilter,
-      long remainingBufLimit) throws IOException {
+      CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> 
deleteKeyFilter, int remainingNum)
+      throws IOException {
     List<OmKeyInfo> keyInfos = new ArrayList<>();
-    String seekFileInDB = metadataManager.getOzonePathKey(volumeId, bucketId,
-        parentInfo.getObjectID(), "");
-    long consumedSize = 0;
+    String seekFileInDB = metadataManager.getOzonePathKey(volumeId, bucketId, 
parentInfo.getObjectID(), "");
     try (TableIterator<String, ? extends KeyValue<String, T>> iterator = 
table.iterator(seekFileInDB)) {
-      while (iterator.hasNext() && remainingBufLimit > 0) {
+      while (iterator.hasNext() && remainingNum > 0) {
         KeyValue<String, T> entry = iterator.next();
-        final long objectSerializedSize = entry.getValueByteSize();
-        // No need to check the table again as the value in cache and iterator 
would be same when directory
-        // deleting service runs.
-        if (remainingBufLimit - objectSerializedSize < 0) {
-          break;
-        }
         KeyValue<String, OmKeyInfo> keyInfo = 
deleteKeyTransformer.apply(entry);
         if (deleteKeyFilter.apply(keyInfo)) {
           keyInfos.add(keyInfo.getValue());
-          remainingBufLimit -= objectSerializedSize;
-          consumedSize += objectSerializedSize;
+          remainingNum--;
         }
       }
-      return new DeleteKeysResult(keyInfos, consumedSize, !iterator.hasNext());
+      return new DeleteKeysResult(keyInfos, !iterator.hasNext());
     }
   }
 
   @Override
   public DeleteKeysResult getPendingDeletionSubFiles(long volumeId,
       long bucketId, OmKeyInfo parentInfo,
-      CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> 
filter, long remainingBufLimit)
+      CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> 
filter, int remainingNum)
           throws IOException {
     CheckedFunction<KeyValue<String, OmKeyInfo>, KeyValue<String, OmKeyInfo>, 
IOException> tranformer = kv -> {
       OmKeyInfo keyInfo = OMFileRequest.getKeyInfoWithFullPath(parentInfo, 
kv.getValue());
@@ -2327,7 +2315,7 @@ public DeleteKeysResult getPendingDeletionSubFiles(long 
volumeId,
       return Table.newKeyValue(deleteKey, keyInfo);
     };
     return gatherSubPathsWithIterator(volumeId, bucketId, parentInfo, 
metadataManager.getFileTable(), tranformer,
-        filter, remainingBufLimit);
+        filter, remainingNum);
   }
 
   public boolean isBucketFSOptimized(String volName, String buckName)
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 45256a7ff05..a79eeda74f2 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -19,6 +19,8 @@
 
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
 
@@ -44,6 +46,7 @@
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -160,6 +163,7 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
   private final AtomicLong deletedDirsCount;
   private final AtomicLong movedDirsCount;
   private final AtomicLong movedFilesCount;
+  private final int pathLimitPerTask;
 
   public DirectoryDeletingService(long interval, TimeUnit unit,
       long serviceTimeout, OzoneManager ozoneManager,
@@ -181,6 +185,8 @@ public DirectoryDeletingService(long interval, TimeUnit 
unit,
     this.deletedDirsCount = new AtomicLong(0);
     this.movedDirsCount = new AtomicLong(0);
     this.movedFilesCount = new AtomicLong(0);
+    this.pathLimitPerTask =
+        configuration.getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK, 
OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
   }
 
   public void registerReconfigCallbacks(ReconfigurationHandler handler) {
@@ -261,31 +267,28 @@ void optimizeDirDeletesAndSubmitRequest(
       List<Pair<String, OmKeyInfo>> allSubDirList,
       List<PurgePathRequest> purgePathRequestList,
       String snapTableKey, long startTime,
-      long remainingBufLimit, KeyManager keyManager,
+      KeyManager keyManager,
       CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> 
reclaimableDirChecker,
       CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> 
reclaimableFileChecker,
       Map<VolumeBucketId, BucketNameInfo> bucketNameInfoMap,
-      UUID expectedPreviousSnapshotId, long rnCnt) {
+      UUID expectedPreviousSnapshotId, long rnCnt, AtomicInteger remainNum) {
 
     // Optimization to handle delete sub-dir and keys to remove quickly
     // This case will be useful to handle when depth of directory is high
     int subdirDelNum = 0;
     int subDirRecursiveCnt = 0;
-    int consumedSize = 0;
-    while (subDirRecursiveCnt < allSubDirList.size() && remainingBufLimit > 0) 
{
+    while (subDirRecursiveCnt < allSubDirList.size() && remainNum.get() > 0) {
       try {
         Pair<String, OmKeyInfo> stringOmKeyInfoPair = 
allSubDirList.get(subDirRecursiveCnt++);
         Boolean subDirectoryReclaimable = 
reclaimableDirChecker.apply(Table.newKeyValue(stringOmKeyInfoPair.getKey(),
             stringOmKeyInfoPair.getValue()));
         Optional<PurgePathRequest> request = prepareDeleteDirRequest(
             stringOmKeyInfoPair.getValue(), stringOmKeyInfoPair.getKey(), 
subDirectoryReclaimable, allSubDirList,
-            keyManager, reclaimableFileChecker, remainingBufLimit);
+            keyManager, reclaimableFileChecker, remainNum);
         if (!request.isPresent()) {
           continue;
         }
         PurgePathRequest requestVal = request.get();
-        consumedSize += requestVal.getSerializedSize();
-        remainingBufLimit -= consumedSize;
         purgePathRequestList.add(requestVal);
         // Count up the purgeDeletedDir, subDirs and subFiles
         if (requestVal.hasDeletedDir() && 
!StringUtils.isBlank(requestVal.getDeletedDir())) {
@@ -300,7 +303,7 @@ void optimizeDirDeletesAndSubmitRequest(
       }
     }
     if (!purgePathRequestList.isEmpty()) {
-      submitPurgePaths(purgePathRequestList, snapTableKey, 
expectedPreviousSnapshotId, bucketNameInfoMap);
+      submitPurgePathsWithBatching(purgePathRequestList, snapTableKey, 
expectedPreviousSnapshotId, bucketNameInfoMap);
     }
 
     if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
@@ -378,7 +381,7 @@ private Optional<PurgePathRequest> prepareDeleteDirRequest(
       List<Pair<String, OmKeyInfo>> subDirList,
       KeyManager keyManager,
       CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> 
reclaimableFileFilter,
-      long remainingBufLimit) throws IOException {
+      AtomicInteger remainNum) throws IOException {
     // step-0: Get one pending deleted directory
     if (LOG.isDebugEnabled()) {
       LOG.debug("Pending deleted dir name: {}",
@@ -389,12 +392,13 @@ private Optional<PurgePathRequest> 
prepareDeleteDirRequest(
         .getVolumeBucketIdPairFSO(delDirName);
 
     // step-1: get all sub directories under the deletedDir
+    int remainingNum = remainNum.get();
     DeleteKeysResult subDirDeleteResult =
         keyManager.getPendingDeletionSubDirs(volumeBucketId.getVolumeId(), 
volumeBucketId.getBucketId(),
-            pendingDeletedDirInfo, keyInfo -> true, remainingBufLimit);
+            pendingDeletedDirInfo, keyInfo -> true, remainingNum);
     List<OmKeyInfo> subDirs = subDirDeleteResult.getKeysToDelete();
     subDirs.forEach(omKeyInfo -> omKeyInfo.setAcls(Collections.emptyList()));
-    remainingBufLimit -= subDirDeleteResult.getConsumedSize();
+    remainNum.addAndGet(-subDirs.size());
 
     OMMetadataManager omMetadataManager = keyManager.getMetadataManager();
     for (OmKeyInfo dirInfo : subDirs) {
@@ -408,11 +412,13 @@ private Optional<PurgePathRequest> 
prepareDeleteDirRequest(
 
     // step-2: get all sub files under the deletedDir
     // Only remove sub files if the parent directory is going to be deleted or 
can be reclaimed.
+    remainingNum = remainNum.get();
     DeleteKeysResult subFileDeleteResult =
         keyManager.getPendingDeletionSubFiles(volumeBucketId.getVolumeId(), 
volumeBucketId.getBucketId(),
-            pendingDeletedDirInfo, keyInfo -> purgeDir || 
reclaimableFileFilter.apply(keyInfo), remainingBufLimit);
+            pendingDeletedDirInfo, keyInfo -> purgeDir || 
reclaimableFileFilter.apply(keyInfo), remainingNum);
     List<OmKeyInfo> subFiles = subFileDeleteResult.getKeysToDelete();
     subFiles.forEach(omKeyInfo -> omKeyInfo.setAcls(Collections.emptyList()));
+    remainNum.addAndGet(-subFiles.size());
 
     if (LOG.isDebugEnabled()) {
       for (OmKeyInfo fileInfo : subFiles) {
@@ -422,11 +428,14 @@ private Optional<PurgePathRequest> 
prepareDeleteDirRequest(
 
     // step-3: If both sub-dirs and sub-files are exhausted under a parent
     // directory, only then delete the parent.
-    String purgeDeletedDir = purgeDir && subDirDeleteResult.isProcessedKeys() 
&&
-        subFileDeleteResult.isProcessedKeys() ? delDirName :  null;
+    String purgeDeletedDir =
+        purgeDir && subDirDeleteResult.isProcessedKeys() && 
subFileDeleteResult.isProcessedKeys() ? delDirName : null;
     if (purgeDeletedDir == null && subFiles.isEmpty() && subDirs.isEmpty()) {
       return Optional.empty();
     }
+    if (purgeDeletedDir != null) {
+      remainNum.addAndGet(-1);
+    }
     return Optional.of(wrapPurgeRequest(volumeBucketId.getVolumeId(), 
volumeBucketId.getBucketId(),
         purgeDeletedDir, subFiles, subDirs));
   }
@@ -461,8 +470,50 @@ private OzoneManagerProtocolProtos.PurgePathRequest 
wrapPurgeRequest(
     return purgePathsRequest.build();
   }
 
-  private OzoneManagerProtocolProtos.OMResponse 
submitPurgePaths(List<PurgePathRequest> requests,
+  private List<OzoneManagerProtocolProtos.OMResponse> 
submitPurgePathsWithBatching(List<PurgePathRequest> requests,
       String snapTableKey, UUID expectedPreviousSnapshotId, 
Map<VolumeBucketId, BucketNameInfo> bucketNameInfoMap) {
+
+    List<OzoneManagerProtocolProtos.OMResponse> responses = new ArrayList<>();
+    List<PurgePathRequest> purgePathRequestBatch = new ArrayList<>();
+    long batchBytes = 0;
+
+    for (PurgePathRequest req : requests) {
+      int reqSize = req.getSerializedSize();
+
+      // If adding this request would exceed the limit, flush the current 
batch first
+      if (batchBytes + reqSize > ratisByteLimit && 
!purgePathRequestBatch.isEmpty()) {
+        OzoneManagerProtocolProtos.OMResponse resp =
+            submitPurgeRequest(snapTableKey, expectedPreviousSnapshotId, 
bucketNameInfoMap, purgePathRequestBatch);
+        if (!resp.getSuccess()) {
+          return Collections.emptyList();
+        }
+        responses.add(resp);
+        purgePathRequestBatch.clear();
+        batchBytes = 0;
+      }
+
+      // Add current request to batch
+      purgePathRequestBatch.add(req);
+      batchBytes += reqSize;
+    }
+
+    // Flush remaining batch if any
+    if (!purgePathRequestBatch.isEmpty()) {
+      OzoneManagerProtocolProtos.OMResponse resp =
+          submitPurgeRequest(snapTableKey, expectedPreviousSnapshotId, 
bucketNameInfoMap, purgePathRequestBatch);
+      if (!resp.getSuccess()) {
+        return Collections.emptyList();
+      }
+      responses.add(resp);
+    }
+
+    return responses;
+  }
+
+  @VisibleForTesting
+  OzoneManagerProtocolProtos.OMResponse submitPurgeRequest(String snapTableKey,
+      UUID expectedPreviousSnapshotId, Map<VolumeBucketId, BucketNameInfo> 
bucketNameInfoMap,
+      List<PurgePathRequest> pathRequests) {
     OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest 
=
         OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
 
@@ -476,17 +527,14 @@ private OzoneManagerProtocolProtos.OMResponse 
submitPurgePaths(List<PurgePathReq
     }
     
purgeDirRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build());
 
-    purgeDirRequest.addAllDeletedPath(requests);
-    
purgeDirRequest.addAllBucketNameInfos(requests.stream().map(purgePathRequest ->
-        new VolumeBucketId(purgePathRequest.getVolumeId(), 
purgePathRequest.getBucketId())).distinct()
-        
.map(bucketNameInfoMap::get).filter(Objects::nonNull).collect(Collectors.toList()));
+    purgeDirRequest.addAllDeletedPath(pathRequests);
+    purgeDirRequest.addAllBucketNameInfos(pathRequests.stream()
+        .map(purgePathRequest -> new 
VolumeBucketId(purgePathRequest.getVolumeId(), purgePathRequest.getBucketId()))
+        
.distinct().map(bucketNameInfoMap::get).filter(Objects::nonNull).collect(Collectors.toList()));
 
     OzoneManagerProtocolProtos.OMRequest omRequest =
-        OzoneManagerProtocolProtos.OMRequest.newBuilder()
-            .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
-            .setPurgeDirectoriesRequest(purgeDirRequest)
-            .setClientId(getClientId().toString())
-            .build();
+        
OzoneManagerProtocolProtos.OMRequest.newBuilder().setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
+            
.setPurgeDirectoriesRequest(purgeDirRequest).setClientId(getClientId().toString()).build();
 
     // Submit Purge paths request to OM. Acquire bootstrap lock when 
processing deletes for snapshots.
     try {
@@ -528,8 +576,8 @@ private 
OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequ
      * @param keyManager KeyManager of the underlying store.
      */
     @VisibleForTesting
-    void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, 
KeyManager keyManager,
-        long remainingBufLimit, long rnCnt) throws IOException, 
ExecutionException, InterruptedException {
+    void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, 
KeyManager keyManager, long rnCnt, int remainNum)
+        throws IOException, ExecutionException, InterruptedException {
       String volume, bucket; String snapshotTableKey;
       if (currentSnapshotInfo != null) {
         volume = currentSnapshotInfo.getVolumeName();
@@ -553,8 +601,8 @@ void processDeletedDirsForStore(SnapshotInfo 
currentSnapshotInfo, KeyManager key
         for (int i = 0; i < numberOfParallelThreadsPerStore; i++) {
           CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() 
-> {
             try {
-              return processDeletedDirectories(currentSnapshotInfo, 
keyManager, dirSupplier, remainingBufLimit,
-                  expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt);
+              return processDeletedDirectories(currentSnapshotInfo, 
keyManager, dirSupplier,
+                  expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt, 
remainNum);
             } catch (Throwable e) {
               return false;
             }
@@ -594,16 +642,16 @@ void processDeletedDirsForStore(SnapshotInfo 
currentSnapshotInfo, KeyManager key
      * @param currentSnapshotInfo Information about the current snapshot whose 
deleted directories are being processed.
      * @param keyManager Key manager of the underlying storage system to 
handle key operations.
      * @param dirSupplier Supplier for fetching pending deleted directories to 
be processed.
-     * @param remainingBufLimit Remaining buffer limit for processing 
directories and files.
      * @param expectedPreviousSnapshotId The UUID of the previous snapshot 
expected in the chain.
      * @param totalExclusiveSizeMap A map for storing total exclusive size and 
exclusive replicated size
      *                              for each snapshot.
      * @param runCount The number of times the processing task has been 
executed.
+     * @param remaining Number of dirs to be processed.
      * @return A boolean indicating whether the processed directory list is 
empty.
      */
     private boolean processDeletedDirectories(SnapshotInfo 
currentSnapshotInfo, KeyManager keyManager,
-        DeletedDirSupplier dirSupplier, long remainingBufLimit, UUID 
expectedPreviousSnapshotId,
-        Map<UUID, Pair<Long, Long>> totalExclusiveSizeMap, long runCount) {
+        DeletedDirSupplier dirSupplier, UUID expectedPreviousSnapshotId,
+        Map<UUID, Pair<Long, Long>> totalExclusiveSizeMap, long runCount, int 
remaining) {
       OmSnapshotManager omSnapshotManager = 
getOzoneManager().getOmSnapshotManager();
       IOzoneManagerLock lock = 
getOzoneManager().getMetadataManager().getLock();
       String snapshotTableKey = currentSnapshotInfo == null ? null : 
currentSnapshotInfo.getTableKey();
@@ -615,12 +663,12 @@ private boolean processDeletedDirectories(SnapshotInfo 
currentSnapshotInfo, KeyM
         long dirNum = 0L;
         long subDirNum = 0L;
         long subFileNum = 0L;
-        int consumedSize = 0;
         List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
         Map<VolumeBucketId, BucketNameInfo> bucketNameInfos = new HashMap<>();
+        AtomicInteger remainNum = new AtomicInteger(remaining);
 
         List<Pair<String, OmKeyInfo>> allSubDirList = new ArrayList<>();
-        while (remainingBufLimit > 0) {
+        while (remainNum.get() > 0) {
           KeyValue<String, OmKeyInfo> pendingDeletedDirInfo = 
dirSupplier.get();
           if (pendingDeletedDirInfo == null) {
             break;
@@ -639,13 +687,11 @@ private boolean processDeletedDirectories(SnapshotInfo 
currentSnapshotInfo, KeyM
           Optional<PurgePathRequest> request = prepareDeleteDirRequest(
               pendingDeletedDirInfo.getValue(),
               pendingDeletedDirInfo.getKey(), isDirReclaimable, allSubDirList,
-              getOzoneManager().getKeyManager(), reclaimableFileFilter, 
remainingBufLimit);
+              getOzoneManager().getKeyManager(), reclaimableFileFilter, 
remainNum);
           if (!request.isPresent()) {
             continue;
           }
           PurgePathRequest purgePathRequest = request.get();
-          consumedSize += purgePathRequest.getSerializedSize();
-          remainingBufLimit -= consumedSize;
           purgePathRequestList.add(purgePathRequest);
           // Count up the purgeDeletedDir, subDirs and subFiles
           if (purgePathRequest.hasDeletedDir() && 
!StringUtils.isBlank(purgePathRequest.getDeletedDir())) {
@@ -657,9 +703,9 @@ private boolean processDeletedDirectories(SnapshotInfo 
currentSnapshotInfo, KeyM
 
         optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
             subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey,
-            startTime, remainingBufLimit, getOzoneManager().getKeyManager(),
+            startTime, getOzoneManager().getKeyManager(),
             reclaimableDirFilter, reclaimableFileFilter, bucketNameInfos, 
expectedPreviousSnapshotId,
-            runCount);
+            runCount, remainNum);
         Map<UUID, Long> exclusiveReplicatedSizeMap = 
reclaimableFileFilter.getExclusiveReplicatedSizeMap();
         Map<UUID, Long> exclusiveSizeMap = 
reclaimableFileFilter.getExclusiveSizeMap();
         List<UUID> previousPathSnapshotsInChain =
@@ -719,7 +765,7 @@ public BackgroundTaskResult call() {
                   snapInfo.getName())) {
             KeyManager keyManager = snapInfo == null ? 
getOzoneManager().getKeyManager()
                 : omSnapshot.get().getKeyManager();
-            processDeletedDirsForStore(snapInfo, keyManager, ratisByteLimit, 
run);
+            processDeletedDirsForStore(snapInfo, keyManager, run, 
pathLimitPerTask);
           }
         } catch (IOException | ExecutionException e) {
           LOG.error("Error while running delete files background task for 
store {}. Will retry at next run.",
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
index 9fabe5a4650..06b70dca905 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
@@ -24,20 +24,26 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.mockStatic;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
 import org.apache.hadoop.ozone.om.KeyManager;
@@ -51,11 +57,13 @@
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.util.Time;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.util.ExitUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.mockito.MockedStatic;
@@ -185,7 +193,7 @@ public void testMultithreadedDirectoryDeletion() throws 
Exception {
         = new OmTestManagers(conf);
     OzoneManager ozoneManager = omTestManagers.getOzoneManager();
     AtomicBoolean isRunning = new AtomicBoolean(true);
-    try (MockedStatic mockedStatic = 
Mockito.mockStatic(CompletableFuture.class, CALLS_REAL_METHODS)) {
+    try (MockedStatic mockedStatic = mockStatic(CompletableFuture.class, 
CALLS_REAL_METHODS)) {
       List<Pair<Supplier, CompletableFuture>> futureList = new ArrayList<>();
       Thread deletionThread = new Thread(() -> {
         while (futureList.size() < threadCount) {
@@ -221,7 +229,7 @@ public void testMultithreadedDirectoryDeletion() throws 
Exception {
       DirectoryDeletingService.DirDeletingTask dirDeletingTask =
           ozoneManager.getKeyManager().getDirDeletingService().new 
DirDeletingTask(null);
 
-      dirDeletingTask.processDeletedDirsForStore(null, 
ozoneManager.getKeyManager(), Long.MAX_VALUE, 1);
+      dirDeletingTask.processDeletedDirsForStore(null, 
ozoneManager.getKeyManager(), 1, 6000);
       assertThat(futureList).hasSize(threadCount);
       for (Pair<Supplier, CompletableFuture> pair : futureList) {
         assertTrue(pair.getRight().isDone());
@@ -231,4 +239,77 @@ public void testMultithreadedDirectoryDeletion() throws 
Exception {
       ozoneManager.stop();
     }
   }
+
+  @Test
+  @DisplayName("DirectoryDeletingService batches PurgeDirectories by Ratis 
byte limit (via submitRequest spy)")
+  void testPurgeDirectoriesBatching() throws Exception {
+    final int ratisLimitBytes = 2304;
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    File testDir = Files.createTempDirectory("TestDDS-SubmitSpy").toFile();
+    ServerUtils.setOzoneMetaDirPath(conf, testDir.toString());
+    conf.setTimeDuration(OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL, 
100, TimeUnit.MILLISECONDS);
+    
conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, 
ratisLimitBytes, StorageUnit.BYTES);
+    conf.setQuietMode(false);
+
+    OmTestManagers managers = new OmTestManagers(conf);
+    om = managers.getOzoneManager();
+    KeyManager km = managers.getKeyManager();
+
+    DirectoryDeletingService real = (DirectoryDeletingService) 
km.getDirDeletingService();
+    DirectoryDeletingService dds = Mockito.spy(real);
+
+    List<OzoneManagerProtocolProtos.OMRequest> captured = new ArrayList<>();
+    Mockito.doAnswer(inv -> {
+      OzoneManagerProtocolProtos.OMRequest req = inv.getArgument(0);
+      captured.add(req);
+      return OzoneManagerProtocolProtos.OMResponse.newBuilder()
+          
.setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories).setStatus(OzoneManagerProtocolProtos.Status.OK)
+          .build();
+    
}).when(dds).submitRequest(Mockito.any(OzoneManagerProtocolProtos.OMRequest.class));
+
+    final long volumeId = 1L, bucketId = 2L;
+    List<OzoneManagerProtocolProtos.PurgePathRequest> purgeList = new 
ArrayList<>();
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < 30; i++) {
+      sb.append("0123456789");
+    }
+    final String longSuffix = sb.toString();
+
+    for (int i = 0; i < 20; i++) {
+      
purgeList.add(OzoneManagerProtocolProtos.PurgePathRequest.newBuilder().setVolumeId(volumeId).setBucketId(bucketId)
+          .setDeletedDir("dir-" + longSuffix + "-" + i).build());
+    }
+
+    org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId vbId =
+        new 
org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId(volumeId, bucketId);
+    OzoneManagerProtocolProtos.BucketNameInfo bni =
+        
OzoneManagerProtocolProtos.BucketNameInfo.newBuilder().setVolumeId(volumeId).setBucketId(bucketId)
+            .setVolumeName("v").setBucketName("b").build();
+    Map<org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId, 
OzoneManagerProtocolProtos.BucketNameInfo>
+        bucketNameInfoMap = new HashMap<>();
+    bucketNameInfoMap.put(vbId, bni);
+
+    dds.optimizeDirDeletesAndSubmitRequest(0L, 0L, 0L, new ArrayList<>(), 
purgeList, null, Time.monotonicNow(), km,
+        kv -> true, kv -> true, bucketNameInfoMap, null, 1L, new 
AtomicInteger(Integer.MAX_VALUE));
+
+    assertThat(captured.size())
+        .as("Expect batching to respect Ratis byte limit")
+        .isBetween(3, 5);
+
+    for (OzoneManagerProtocolProtos.OMRequest omReq : captured) {
+      
assertThat(omReq.getCmdType()).isEqualTo(OzoneManagerProtocolProtos.Type.PurgeDirectories);
+
+      OzoneManagerProtocolProtos.PurgeDirectoriesRequest purge = 
omReq.getPurgeDirectoriesRequest();
+      int payloadBytes =
+          
purge.getDeletedPathList().stream().mapToInt(OzoneManagerProtocolProtos.PurgePathRequest::getSerializedSize)
+              .sum();
+
+      assertThat(payloadBytes).as("Batch size should respect Ratis byte 
limit").isLessThanOrEqualTo(ratisLimitBytes);
+    }
+
+    org.apache.commons.io.FileUtils.deleteDirectory(testDir);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to