Copilot commented on code in PR #8570:
URL: https://github.com/apache/ozone/pull/8570#discussion_r2131178620


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java:
##########
@@ -142,208 +156,274 @@ public void setRatisByteLimit(int ratisByteLimit) {
   @Override
   public BackgroundTaskQueue getTasks() {
     BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    if (taskCount.get() > 0) {
-      LOG.info("{} Directory deleting task(s) already in progress.",
-          taskCount.get());
-      return queue;
-    }
-    try {
-      deletedDirSupplier.reInitItr();
-    } catch (IOException ex) {
-      LOG.error("Unable to get the iterator.", ex);
-      return queue;
-    }
-    taskCount.set(dirDeletingCorePoolSize);
-    for (int i = 0; i < dirDeletingCorePoolSize; i++) {
-      queue.add(new DirectoryDeletingService.DirDeletingTask(this));
+    queue.add(new DirDeletingTask(this, null));
+    if (deepCleanSnapshots) {
+      Iterator<UUID> iterator = null;
+      try {
+        iterator = snapshotChainManager.iterator(true);
+      } catch (IOException e) {
+        LOG.error("Error while initializing snapshot chain iterator.");
+        return queue;
+      }
+      while (iterator.hasNext()) {
+        UUID snapshotId = iterator.next();
+        queue.add(new DirDeletingTask(this, snapshotId));
+      }
     }
     return queue;
   }
 
   @Override
   public void shutdown() {
     super.shutdown();
-    deletedDirSupplier.closeItr();
   }
 
-  private final class DeletedDirSupplier {
+  private static final class DeletedDirSupplier implements Closeable {
     private TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
         deleteTableIterator;
 
-    private synchronized Table.KeyValue<String, OmKeyInfo> get()
-        throws IOException {
+    private DeletedDirSupplier(TableIterator<String, ? extends 
KeyValue<String, OmKeyInfo>> deleteTableIterator) {
+      this.deleteTableIterator = deleteTableIterator;
+    }
+
+    private synchronized Table.KeyValue<String, OmKeyInfo> get() {
       if (deleteTableIterator.hasNext()) {
         return deleteTableIterator.next();
       }
       return null;
     }
 
-    private synchronized void closeItr() {
+    @Override
+    public void close() {
       IOUtils.closeQuietly(deleteTableIterator);
-      deleteTableIterator = null;
-    }
-
-    private synchronized void reInitItr() throws IOException {
-      closeItr();
-      deleteTableIterator =
-          getOzoneManager().getMetadataManager().getDeletedDirTable()
-              .iterator();
     }
   }
 
   private final class DirDeletingTask implements BackgroundTask {
     private final DirectoryDeletingService directoryDeletingService;
+    private final UUID snapshotId;
 
-    private DirDeletingTask(DirectoryDeletingService service) {
+    private DirDeletingTask(DirectoryDeletingService service, UUID snapshotId) 
{
       this.directoryDeletingService = service;
+      this.snapshotId = snapshotId;
     }
 
     @Override
     public int getPriority() {
       return 0;
     }
 
-    @Override
-    public BackgroundTaskResult call() {
-      try {
-        if (shouldRun()) {
-          isRunningOnAOS.set(true);
-          long rnCnt = getRunCount().incrementAndGet();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Running DirectoryDeletingService. {}", rnCnt);
-          }
-          long dirNum = 0L;
-          long subDirNum = 0L;
-          long subFileNum = 0L;
-          long remainingBufLimit = ratisByteLimit;
-          int consumedSize = 0;
-          List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
-          List<Pair<String, OmKeyInfo>> allSubDirList =
-              new ArrayList<>();
-
-          Table.KeyValue<String, OmKeyInfo> pendingDeletedDirInfo;
-          // This is to avoid race condition b/w purge request and snapshot 
chain updation. For AOS taking the global
-          // snapshotId since AOS could process multiple buckets in one 
iteration.
-          try {
-            UUID expectedPreviousSnapshotId =
-                ((OmMetadataManagerImpl) 
getOzoneManager().getMetadataManager()).getSnapshotChainManager()
-                    .getLatestGlobalSnapshotId();
-
-            long startTime = Time.monotonicNow();
-            while (remainingBufLimit > 0) {
-              pendingDeletedDirInfo = getPendingDeletedDirInfo();
-              if (pendingDeletedDirInfo == null) {
-                break;
-              }
-              // Do not reclaim if the directory is still being referenced by
-              // the previous snapshot.
-              if (previousSnapshotHasDir(pendingDeletedDirInfo)) {
-                continue;
-              }
+    private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest 
getSetSnapshotRequestUpdatingExclusiveSize(
+        long exclusiveSize, long exclusiveReplicatedSize, UUID snapshotID) {
+      OzoneManagerProtocolProtos.SnapshotSize snapshotSize = 
OzoneManagerProtocolProtos.SnapshotSize.newBuilder()
+          .setExclusiveSize(exclusiveSize)
+          .setExclusiveReplicatedSize(exclusiveReplicatedSize)
+          .build();
+      return OzoneManagerProtocolProtos.SetSnapshotPropertyRequest.newBuilder()
+          .setSnapshotKey(snapshotChainManager.getTableKey(snapshotID))
+          .setSnapshotSizeDeltaFromDirDeepCleaning(snapshotSize)
+          .build();
+    }
 
-              PurgePathRequest request = prepareDeleteDirRequest(
-                  pendingDeletedDirInfo.getValue(),
-                  pendingDeletedDirInfo.getKey(), allSubDirList,
-                  getOzoneManager().getKeyManager(), remainingBufLimit);
+    /**
+     *
+     * @param currentSnapshotInfo if null, deleted directories in AOS should 
be processed.
+     * @param keyManager KeyManager of the underlying store.
+     */
+    private void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, 
KeyManager keyManager,
+        long remainingBufLimit, long rnCnt) throws IOException, 
ExecutionException, InterruptedException {
+      String volume, bucket; String snapshotTableKey;
+      if (currentSnapshotInfo != null) {
+        volume = currentSnapshotInfo.getVolumeName();
+        bucket = currentSnapshotInfo.getBucketName();
+        snapshotTableKey = currentSnapshotInfo.getTableKey();
+      } else {
+        volume = null; bucket = null; snapshotTableKey = null;
+      }
 
-              consumedSize += request.getSerializedSize();
-              remainingBufLimit -= consumedSize;
-              purgePathRequestList.add(request);
-              // Count up the purgeDeletedDir, subDirs and subFiles
-              if (request.getDeletedDir() != null && !request.getDeletedDir()
-                  .isEmpty()) {
-                dirNum++;
-              }
-              subDirNum += request.getMarkDeletedSubDirsCount();
-              subFileNum += request.getDeletedSubFilesCount();
+      try (DeletedDirSupplier dirSupplier = new 
DeletedDirSupplier(currentSnapshotInfo == null ?
+          keyManager.getDeletedDirEntries() : 
keyManager.getDeletedDirEntries(volume, bucket))) {
+        // This is to avoid race condition b/w purge request and snapshot 
chain update. For AOS taking the global
+        // snapshotId since AOS could process multiple buckets in one 
iteration. While using path
+        // previous snapshotId for a snapshot since it would process only one 
bucket.
+        UUID expectedPreviousSnapshotId = currentSnapshotInfo == null ?
+            snapshotChainManager.getLatestGlobalSnapshotId() :
+            SnapshotUtils.getPreviousSnapshotId(currentSnapshotInfo, 
snapshotChainManager);
+        Map<UUID, Pair<Long, Long>> exclusiveSizeMap = Maps.newConcurrentMap();
+
+        CompletableFuture<Boolean> processedAllDeletedDirs = 
CompletableFuture.completedFuture(true);
+        for (int i = 0; i < numberOfParallelThreadsPerStore; i++) {
+          CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() 
-> {
+            try {
+              return processDeletedDirectories(currentSnapshotInfo, 
keyManager, dirSupplier, remainingBufLimit,
+                  expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt);
+            } catch (Throwable e) {
+              return false;
             }
-
-            optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
-                subFileNum, allSubDirList, purgePathRequestList, null,
-                startTime, remainingBufLimit,
-                getOzoneManager().getKeyManager(), expectedPreviousSnapshotId,
-                rnCnt);
-
-          } catch (IOException e) {
-            LOG.error(
-                "Error while running delete directories and files " + 
"background task. Will retry at next run.",
-                e);
+          }, deletionThreadPool);
+          processedAllDeletedDirs = future.thenCombine(future, (a, b) -> a && 
b);
+        }

Review Comment:
   Combining the same future using thenCombine does not accumulate results 
across multiple asynchronous tasks. Consider collecting all futures into a list 
and using CompletableFuture.allOf() to aggregate the results for correct 
parallel execution handling.
   ```suggestion
             futures.add(future);
           }
           CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
           CompletableFuture<Boolean> processedAllDeletedDirs = 
allFutures.thenApply(v -> 
               futures.stream().map(CompletableFuture::join).reduce(true, (a, 
b) -> a && b));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to