liming30 commented on code in PR #1823:
URL: https://github.com/apache/incubator-paimon/pull/1823#discussion_r1295576061


##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java:
##########
@@ -179,31 +191,57 @@ public void expireUntil(long earliestId, long 
endExclusiveId) {
             }
             Snapshot snapshot = snapshotManager.snapshot(id);
             if (snapshot.changelogManifestList() != null) {
-                
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
+                dataFileCleanFutures.add(
+                        snapshotDeletion.deleteAddedDataFilesAsync(
+                                snapshot.changelogManifestList(), ioExecutor));
             }
         }
 
         // data files and changelog files in bucket directories has been 
deleted
         // then delete changed bucket directories if they are empty
-        snapshotDeletion.cleanDataDirectories();
+        final Map<BinaryRow, Set<Integer>> deletionBuckets =
+                snapshotDeletion.getAndResetDeletionBuckets();
+        CompletableFuture<Void> cleanEmptyDirFuture =
+                CompletableFuture.allOf(dataFileCleanFutures.toArray(new 
CompletableFuture[0]))
+                        .thenRunAsync(
+                                () -> 
snapshotDeletion.cleanDataDirectories(deletionBuckets),
+                                ioExecutor);
 
         // delete manifests and indexFiles
-        List<Snapshot> skippingSnapshots =
-                TagManager.findOverlappedSnapshots(
-                        taggedSnapshots, beginInclusiveId, endExclusiveId);
-        skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId));
-        Set<String> skippingSet = 
snapshotDeletion.manifestSkippingSet(skippingSnapshots);
-        for (long id = beginInclusiveId; id < endExclusiveId; id++) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Ready to delete manifests in snapshot #" + id);
-            }
-
-            Snapshot snapshot = snapshotManager.snapshot(id);
-            snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet);
-
-            // delete snapshot last
-            
snapshotManager.fileIO().deleteQuietly(snapshotManager.snapshotPath(id));
-        }
+        final long startId = beginInclusiveId, endId = endExclusiveId;
+        cleanEmptyDirFuture.whenCompleteAsync(

Review Comment:
   I modified it to wait for the previous `future` to complete before 
continuing in the next `expire`.  Please help review it when you have time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to