Copilot commented on code in PR #8570: URL: https://github.com/apache/ozone/pull/8570#discussion_r2131178620
########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java: ########## @@ -142,208 +156,274 @@ public void setRatisByteLimit(int ratisByteLimit) { @Override public BackgroundTaskQueue getTasks() { BackgroundTaskQueue queue = new BackgroundTaskQueue(); - if (taskCount.get() > 0) { - LOG.info("{} Directory deleting task(s) already in progress.", - taskCount.get()); - return queue; - } - try { - deletedDirSupplier.reInitItr(); - } catch (IOException ex) { - LOG.error("Unable to get the iterator.", ex); - return queue; - } - taskCount.set(dirDeletingCorePoolSize); - for (int i = 0; i < dirDeletingCorePoolSize; i++) { - queue.add(new DirectoryDeletingService.DirDeletingTask(this)); + queue.add(new DirDeletingTask(this, null)); + if (deepCleanSnapshots) { + Iterator<UUID> iterator = null; + try { + iterator = snapshotChainManager.iterator(true); + } catch (IOException e) { + LOG.error("Error while initializing snapshot chain iterator."); + return queue; + } + while (iterator.hasNext()) { + UUID snapshotId = iterator.next(); + queue.add(new DirDeletingTask(this, snapshotId)); + } } return queue; } @Override public void shutdown() { super.shutdown(); - deletedDirSupplier.closeItr(); } - private final class DeletedDirSupplier { + private static final class DeletedDirSupplier implements Closeable { private TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> deleteTableIterator; - private synchronized Table.KeyValue<String, OmKeyInfo> get() - throws IOException { + private DeletedDirSupplier(TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> deleteTableIterator) { + this.deleteTableIterator = deleteTableIterator; + } + + private synchronized Table.KeyValue<String, OmKeyInfo> get() { if (deleteTableIterator.hasNext()) { return deleteTableIterator.next(); } return null; } - private synchronized void closeItr() { + @Override + public void close() { IOUtils.closeQuietly(deleteTableIterator); - deleteTableIterator = null; - } - - private synchronized void reInitItr() throws IOException { - closeItr(); - deleteTableIterator = - getOzoneManager().getMetadataManager().getDeletedDirTable() - .iterator(); } } private final class DirDeletingTask implements BackgroundTask { private final DirectoryDeletingService directoryDeletingService; + private final UUID snapshotId; - private DirDeletingTask(DirectoryDeletingService service) { + private DirDeletingTask(DirectoryDeletingService service, UUID snapshotId) { this.directoryDeletingService = service; + this.snapshotId = snapshotId; } @Override public int getPriority() { return 0; } - @Override - public BackgroundTaskResult call() { - try { - if (shouldRun()) { - isRunningOnAOS.set(true); - long rnCnt = getRunCount().incrementAndGet(); - if (LOG.isDebugEnabled()) { - LOG.debug("Running DirectoryDeletingService. {}", rnCnt); - } - long dirNum = 0L; - long subDirNum = 0L; - long subFileNum = 0L; - long remainingBufLimit = ratisByteLimit; - int consumedSize = 0; - List<PurgePathRequest> purgePathRequestList = new ArrayList<>(); - List<Pair<String, OmKeyInfo>> allSubDirList = - new ArrayList<>(); - - Table.KeyValue<String, OmKeyInfo> pendingDeletedDirInfo; - // This is to avoid race condition b/w purge request and snapshot chain updation. For AOS taking the global - // snapshotId since AOS could process multiple buckets in one iteration. - try { - UUID expectedPreviousSnapshotId = - ((OmMetadataManagerImpl) getOzoneManager().getMetadataManager()).getSnapshotChainManager() - .getLatestGlobalSnapshotId(); - - long startTime = Time.monotonicNow(); - while (remainingBufLimit > 0) { - pendingDeletedDirInfo = getPendingDeletedDirInfo(); - if (pendingDeletedDirInfo == null) { - break; - } - // Do not reclaim if the directory is still being referenced by - // the previous snapshot. - if (previousSnapshotHasDir(pendingDeletedDirInfo)) { - continue; - } + private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequestUpdatingExclusiveSize( + long exclusiveSize, long exclusiveReplicatedSize, UUID snapshotID) { + OzoneManagerProtocolProtos.SnapshotSize snapshotSize = OzoneManagerProtocolProtos.SnapshotSize.newBuilder() + .setExclusiveSize(exclusiveSize) + .setExclusiveReplicatedSize(exclusiveReplicatedSize) + .build(); + return OzoneManagerProtocolProtos.SetSnapshotPropertyRequest.newBuilder() + .setSnapshotKey(snapshotChainManager.getTableKey(snapshotID)) + .setSnapshotSizeDeltaFromDirDeepCleaning(snapshotSize) + .build(); + } - PurgePathRequest request = prepareDeleteDirRequest( - pendingDeletedDirInfo.getValue(), - pendingDeletedDirInfo.getKey(), allSubDirList, - getOzoneManager().getKeyManager(), remainingBufLimit); + /** + * + * @param currentSnapshotInfo if null, deleted directories in AOS should be processed. + * @param keyManager KeyManager of the underlying store. + */ + private void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, + long remainingBufLimit, long rnCnt) throws IOException, ExecutionException, InterruptedException { + String volume, bucket; String snapshotTableKey; + if (currentSnapshotInfo != null) { + volume = currentSnapshotInfo.getVolumeName(); + bucket = currentSnapshotInfo.getBucketName(); + snapshotTableKey = currentSnapshotInfo.getTableKey(); + } else { + volume = null; bucket = null; snapshotTableKey = null; + } - consumedSize += request.getSerializedSize(); - remainingBufLimit -= consumedSize; - purgePathRequestList.add(request); - // Count up the purgeDeletedDir, subDirs and subFiles - if (request.getDeletedDir() != null && !request.getDeletedDir() - .isEmpty()) { - dirNum++; - } - subDirNum += request.getMarkDeletedSubDirsCount(); - subFileNum += request.getDeletedSubFilesCount(); + try (DeletedDirSupplier dirSupplier = new DeletedDirSupplier(currentSnapshotInfo == null ? + keyManager.getDeletedDirEntries() : keyManager.getDeletedDirEntries(volume, bucket))) { + // This is to avoid race condition b/w purge request and snapshot chain update. For AOS taking the global + // snapshotId since AOS could process multiple buckets in one iteration. While using path + // previous snapshotId for a snapshot since it would process only one bucket. + UUID expectedPreviousSnapshotId = currentSnapshotInfo == null ? + snapshotChainManager.getLatestGlobalSnapshotId() : + SnapshotUtils.getPreviousSnapshotId(currentSnapshotInfo, snapshotChainManager); + Map<UUID, Pair<Long, Long>> exclusiveSizeMap = Maps.newConcurrentMap(); + + CompletableFuture<Boolean> processedAllDeletedDirs = CompletableFuture.completedFuture(true); + for (int i = 0; i < numberOfParallelThreadsPerStore; i++) { + CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> { + try { + return processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier, remainingBufLimit, + expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt); + } catch (Throwable e) { + return false; } - - optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum, - subFileNum, allSubDirList, purgePathRequestList, null, - startTime, remainingBufLimit, - getOzoneManager().getKeyManager(), expectedPreviousSnapshotId, - rnCnt); - - } catch (IOException e) { - LOG.error( - "Error while running delete directories and files " + "background task. Will retry at next run.", - e); + }, deletionThreadPool); + processedAllDeletedDirs = future.thenCombine(future, (a, b) -> a && b); + } Review Comment: Combining the same future using thenCombine does not accumulate results across multiple asynchronous tasks. Consider collecting all futures into a list and using CompletableFuture.allOf() to aggregate the results for correct parallel execution handling. ```suggestion futures.add(future); } CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); CompletableFuture<Boolean> processedAllDeletedDirs = allFutures.thenApply(v -> futures.stream().map(CompletableFuture::join).reduce(true, (a, b) -> a && b)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org